भारी चेतावनी मैंने पहले कभी इस पुस्तकालय का उपयोग नहीं किया है, और कुछ अवधारणाओं के मेरे निम्न स्तर के ज्ञान थोड़ा सा है ... कमी। ज्यादातर मैं the tutorial के माध्यम से पढ़ रहा हूँ। मुझे पूरा यकीन है कि जिसने एसिंक काम किया है, वह इसे पढ़ेगा और हंस जाएगा, लेकिन यह अन्य लोगों के लिए एक उपयोगी प्रारंभिक बिंदु हो सकता है। चेतावनी emptor!
के कुछ थोड़ा सरल के साथ शुरू, प्रदर्शन कैसे एक Stream
काम करता है। हम एक धारा में Result
रों का एक इटरेटर परिवर्तित कर सकते हैं:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
यह हमें धारा उपभोग करने के लिए एक तरह से पता चलता है। Stream
को Future
में परिवर्तित करने के लिए हम and_then
का उपयोग प्रत्येक पेलोड (यहां बस इसे प्रिंट कर रहे हैं) और फिर for_each
पर कुछ करने के लिए करते हैं। इसके बाद हम अजीब नामित forget
method पर कॉल करके भविष्य को चला सकते हैं।
अगला रेडिस लाइब्रेरी को मिश्रण में बांधना है, केवल एक संदेश को संभालना है। चूंकि get_message()
विधि अवरुद्ध हो रही है, इसलिए हमें मिश्रण में कुछ धागे पेश करने की आवश्यकता है। इस प्रकार के एसिंक्रोनस सिस्टम में बड़ी मात्रा में काम करने का अच्छा विचार नहीं है क्योंकि बाकी सब कुछ अवरुद्ध हो जाएगा। For example:
जब तक यह नहीं तो ऐसा हो की व्यवस्था की है, यह सुनिश्चित किया जाना चाहिए कि इस समारोह खत्म की कार्यान्वयन बहुत जल्दी।
एक आदर्श दुनिया में, रेडिस क्रेट वायदा जैसी लाइब्रेरी के ऊपर बनाया जाएगा और यह सब मूल रूप से सामने आएगा।
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
मेरी समझ यहां फ़ज़ीर हो जाती है। एक अलग धागे में, हम संदेश के लिए अवरुद्ध करते हैं और इसे प्राप्त करते समय इसे चैनल में धक्का देते हैं। मुझे समझ में नहीं आता है कि हमें थ्रेड के हैंडल पर क्यों पकड़ना है। मैं उम्मीद करता हूं कि foo.forget
स्ट्रीम अवरुद्ध होने तक प्रतीक्षा कर रहा है।
Redis सर्वर के लिए एक टेलनेट संबंध में, इस पते पर भेजें:
publish rust awesome
और तुम यह काम करता है देखेंगे। प्रिंट स्टेटमेंट जोड़ने से पता चलता है कि (मेरे लिए) foo.forget
कथन थ्रेड होने से पहले चलाया जाता है।
एकाधिक संदेश ट्रिकियर हैं। Sender
उत्पन्न करने वाले पक्ष को उपभोग करने वाले पक्ष से बहुत दूर होने से रोकने के लिए खुद को खपत करता है। यह send
से दूसरे भविष्य को लौटकर पूरा किया जाता है! हम पाश की अगले चरण लिए उसका पुनः उपयोग करने के लिए इसे शटल वापस वहाँ से बाहर की जरूरत है:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
मुझे यकीन है कि वहाँ अंतर्संचालन के इस प्रकार के लिए और अधिक पारिस्थितिकी तंत्र हो जाएगा के रूप में समय पर चला जाता है कर रहा हूँ। उदाहरण के लिए, futures-cpupool क्रेट संभवतः को इस तरह के एक समान उपयोग का समर्थन करने के लिए बढ़ाया जा सकता है।
जिस दिन इसकी बड़ी घोषणा थी उस दिन लाइब्रेरी का उपयोग करना; कितना महत्वाकांक्षी!^_^ – Shepmaster