2016-08-11 10 views
7

मैं एक ऐसी प्रणाली बनाने की कोशिश कर रहा हूं जिसके द्वारा मेरा एप्लिकेशन रेडिस पबसब चैनल से स्ट्रीमिंग डेटा प्राप्त कर सके और इसे संसाधित कर सके। Redis driver कि मैं उपयोग कर रहा हूँ, जंग के लिए अन्य सभी Redis ड्राइवरों कि मैंने देखा है के साथ साथ, एक अवरुद्ध आपरेशन का उपयोग चैनल कि केवल एक मान देता है जब यह डेटा प्राप्त से डेटा प्राप्त करने:futures.rs और Redis PubSub का उपयोग करके अवरुद्ध कॉल के लिए वायदा की धारा को कैसे कार्यान्वित करें?

let msg = match pubsub.get_message() { 
     Ok(m) => m, 
     Err(_) => panic!("Could not get message from pubsub!") 
}; 
let payload: String = match msg.get_payload() { 
    Ok(s) => s, 
    Err(_) => panic!("Could not convert redis message to string!") 
}; 

मैं भविष्य में इस अवरुद्ध फ़ंक्शन कॉल को लपेटने के लिए futures-rs लाइब्रेरी का उपयोग करना चाहता था ताकि इनपुट के इंतजार के दौरान मैं अपने आवेदन के भीतर अन्य कार्य कर सकूं।

मैंने tutorial वायदा के लिए पढ़ा है और Stream बनाने की कोशिश की है जो पबसब द्वारा डेटा प्राप्त होने पर संकेत देगा, लेकिन मुझे यह पता नहीं चल सकता कि ऐसा कैसे किया जाए।

मैं schedule और अवरुद्ध pubsub.get_message() समारोह के लिए poll कार्यों कैसे बना सकते हैं?

+5

जिस दिन इसकी बड़ी घोषणा थी उस दिन लाइब्रेरी का उपयोग करना; कितना महत्वाकांक्षी!^_^ – Shepmaster

उत्तर

10

भारी चेतावनी मैंने पहले कभी इस पुस्तकालय का उपयोग नहीं किया है, और कुछ अवधारणाओं के मेरे निम्न स्तर के ज्ञान थोड़ा सा है ... कमी। ज्यादातर मैं the tutorial के माध्यम से पढ़ रहा हूँ। मुझे पूरा यकीन है कि जिसने एसिंक काम किया है, वह इसे पढ़ेगा और हंस जाएगा, लेकिन यह अन्य लोगों के लिए एक उपयोगी प्रारंभिक बिंदु हो सकता है। चेतावनी emptor!


के कुछ थोड़ा सरल के साथ शुरू, प्रदर्शन कैसे एक Stream काम करता है। हम एक धारा में Result रों का एक इटरेटर परिवर्तित कर सकते हैं:

extern crate futures; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())]; 
    let payloads = stream::iter(payloads.into_iter()); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
} 

यह हमें धारा उपभोग करने के लिए एक तरह से पता चलता है। Stream को Future में परिवर्तित करने के लिए हम and_then का उपयोग प्रत्येक पेलोड (यहां बस इसे प्रिंट कर रहे हैं) और फिर for_each पर कुछ करने के लिए करते हैं। इसके बाद हम अजीब नामित forget method पर कॉल करके भविष्य को चला सकते हैं।


अगला रेडिस लाइब्रेरी को मिश्रण में बांधना है, केवल एक संदेश को संभालना है। चूंकि get_message() विधि अवरुद्ध हो रही है, इसलिए हमें मिश्रण में कुछ धागे पेश करने की आवश्यकता है। इस प्रकार के एसिंक्रोनस सिस्टम में बड़ी मात्रा में काम करने का अच्छा विचार नहीं है क्योंकि बाकी सब कुछ अवरुद्ध हो जाएगा। For example:

जब तक यह नहीं तो ऐसा हो की व्यवस्था की है, यह सुनिश्चित किया जाना चाहिए कि इस समारोह खत्म की कार्यान्वयन बहुत जल्दी

एक आदर्श दुनिया में, रेडिस क्रेट वायदा जैसी लाइब्रेरी के ऊपर बनाया जाएगा और यह सब मूल रूप से सामने आएगा।

extern crate redis; 
extern crate futures; 

use std::thread; 
use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let msg = pubsub.get_message().expect("Unable to get message"); 
     let payload: Result<String, _> = msg.get_payload(); 
     tx.send(payload).forget(); 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

मेरी समझ यहां फ़ज़ीर हो जाती है। एक अलग धागे में, हम संदेश के लिए अवरुद्ध करते हैं और इसे प्राप्त करते समय इसे चैनल में धक्का देते हैं। मुझे समझ में नहीं आता है कि हमें थ्रेड के हैंडल पर क्यों पकड़ना है। मैं उम्मीद करता हूं कि foo.forget स्ट्रीम अवरुद्ध होने तक प्रतीक्षा कर रहा है।

Redis सर्वर के लिए एक टेलनेट संबंध में, इस पते पर भेजें:

publish rust awesome 

और तुम यह काम करता है देखेंगे। प्रिंट स्टेटमेंट जोड़ने से पता चलता है कि (मेरे लिए) foo.forget कथन थ्रेड होने से पहले चलाया जाता है।


एकाधिक संदेश ट्रिकियर हैं। Sender उत्पन्न करने वाले पक्ष को उपभोग करने वाले पक्ष से बहुत दूर होने से रोकने के लिए खुद को खपत करता है। यह send से दूसरे भविष्य को लौटकर पूरा किया जाता है! हम पाश की अगले चरण लिए उसका पुनः उपयोग करने के लिए इसे शटल वापस वहाँ से बाहर की जरूरत है:

extern crate redis; 
extern crate futures; 

use std::thread; 
use std::sync::mpsc; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let mut tx = tx; 

     while let Ok(msg) = pubsub.get_message() { 
      let payload: Result<String, _> = msg.get_payload(); 

      let (next_tx_tx, next_tx_rx) = mpsc::channel(); 

      tx.send(payload).and_then(move |new_tx| { 
       next_tx_tx.send(new_tx).expect("Unable to send successor channel tx"); 
       futures::finished(()) 
      }).forget(); 

      tx = next_tx_rx.recv().expect("Unable to receive successor channel tx"); 
     } 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

मुझे यकीन है कि वहाँ अंतर्संचालन के इस प्रकार के लिए और अधिक पारिस्थितिकी तंत्र हो जाएगा के रूप में समय पर चला जाता है कर रहा हूँ। उदाहरण के लिए, futures-cpupool क्रेट संभवतः को इस तरह के एक समान उपयोग का समर्थन करने के लिए बढ़ाया जा सकता है।

+0

अद्भुत उत्तर के लिए धन्यवाद! सिर्फ एक प्रश्न: 'redis_thread' में शामिल नहीं होने से परिणाम-पढ़ने की प्रक्रिया को गैर-अवरुद्ध करने के पूरे प्रयास को अस्वीकार कर दिया जाता है? शायद कुछ ऐसा है जो मुझे समझ में नहीं आता है। – Ameo

+1

"मुझे उम्मीद है कि foo.forget खुद को अवरुद्ध कर देगा, स्ट्रीम खाली होने तक प्रतीक्षा कर रहा है" असल में, वायदा "तैयार होने तक ब्लॉक" प्रदान करने के लिए बाध्य नहीं हैं। 'भूल जाओ), जहां तक ​​इसका विवरण जाता है, भविष्य में गिराए जाने पर स्वचालित रद्दीकरण को रोकने के लिए आवश्यक है, लेकिन यह प्रतीक्षा से संबंधित नहीं है। स्कैला में, उदाहरण के लिए, इसके लिए 'भविष्य' पर कोई विधि नहीं है, लेकिन इसके बजाय अलग-अलग 'Await.ready'/'Await.result' विधियों की जोड़ी है जो कुछ समय के भीतर भविष्य तैयार होने तक प्रतीक्षा करें। –

+1

जहां तक ​​मैं समझता हूं, भविष्य में- [भविष्य :: चयन'] (http://alexcrichton.com/futures-rs/futures/trait.Future.html#method] के साथ एक समान चीज़ को कार्यान्वित करना संभव है। चयन करें), जहां दूसरा भविष्य एक निश्चित टाइमआउट के बाद पूरा हो जाता है। –

संबंधित मुद्दे