2011-02-06 14 views
7
 var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     var zip = a.Zip(b, (x, y) => x + "-" + y); 
     zip.Subscribe(Console.WriteLine); 

प्रिंटों
0 - 5
1 - 6
2 - 5
6 - - 6 ...एकाधिक IObservable अनुक्रमों में कैसे शामिल हों?

इसके बजाय, मैं समान मूल्यों
5 में शामिल होने के लिए करना चाहते हैं 7 - 7
8 - 8
...

यह आदेश का एक सरलीकृत उदाहरण है जो क्रमबद्ध एसिंक्रोनस अनुक्रमों के 100s विलय कर रहा है। दो आईनेमेरेबल में शामिल होना बहुत आसान है, लेकिन मुझे आरएक्स में ऐसा कुछ करने का कोई तरीका नहीं मिला। कोई विचार?

इनपुट और जो मैं प्राप्त करने की कोशिश कर रहा हूं, उसके बारे में अधिक जानकारी। असल में, पूरी प्रणाली फोर्क-जॉइन पैटर्न से जुड़े कई राज्य मशीनों (एग्रीगेटर, बफर, स्मूथिंग फ़िल्टर इत्यादि) के साथ एक वास्तविक समय पाइपलाइन है। क्या आरएक्स ऐसी चीजों को लागू करने के लिए एक अच्छा फिट है? प्रत्येक इनपुट

public struct DataPoint 
{ 
    public double Value; 
    public DateTimeOffset Timestamp; 
} 

डेटा के प्रत्येक इनपुट बिट आगमन पर टाइमस्टैंप है के रूप में प्रतिनिधित्व किया जा सकता है, इस प्रकार सभी घटनाओं स्वाभाविक रूप से उनके शामिल होने कुंजी (टाइमस्टैम्प) द्वारा आदेश दिया गया है। जैसे-जैसे घटनाएं पाइपलाइन के माध्यम से यात्रा करती हैं, वे फोर्क हो जाते हैं और जुड़ जाते हैं। जुड़ने के लिए टाइमस्टैम्प से सहसंबंधित होना आवश्यक है और पूर्वनिर्धारित क्रम में लागू किया जाना चाहिए। उदाहरण के लिए, शामिल हों (ए, बी, सी, डी) => शामिल हों (शामिल हों (शामिल हों (ए, बी), सी), डी)।

संपादित नीचे क्या मैं जल्दी में साथ आने कर सकता है। उम्मीद है कि मौजूदा आरएक्स ऑपरेटरों के आधार पर एक आसान समाधान है।

static void Test() 
    { 
     var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     //var zip = a.Zip(b, (x, y) => x + "-" + y); 
     //zip.Subscribe(Console.WriteLine); 

     var joined = MergeJoin(a,b, (x,y) => x + "-" + y); 
     joined.Subscribe(Console.WriteLine); 
    } 

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
    { 
     return Observable.CreateWithDisposable<string>(o => 
      { 
       Queue<int> a = new Queue<int>(); 
       Queue<int> b = new Queue<int>(); 
       object gate = new object(); 

       left.Subscribe(x => 
        { 
         lock (gate) 
         { 
          if (a.Count == 0 || a.Peek() < x) 
           a.Enqueue(x); 

          while (a.Count != 0 && b.Count != 0) 
          { 
           if (a.Peek() == b.Peek()) 
           { 
            o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
           } 
           else if (a.Peek() < b.Peek()) 
           { 
            a.Dequeue(); 
           } 
           else 
           { 
            b.Dequeue(); 
           } 
          } 
         } 
        }); 

       right.Subscribe(x => 
       { 
        lock (gate) 
        { 
         if (b.Count == 0 || b.Peek() < x) 
          b.Enqueue(x); 

         while (a.Count != 0 && b.Count != 0) 
         { 
          if (a.Peek() == b.Peek()) 
          { 
           o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
          } 
          else if (a.Peek() < b.Peek()) 
          { 
           a.Dequeue(); 
          } 
          else 
          { 
           b.Dequeue(); 
          } 
         } 
        } 
       }); 

       return Disposable.Empty; 
      }); 
+0

[rx मंच] पर एक ही सवाल (पूछे जाने वाले http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963- 0c83-4968-a1b2-1317d5e31ae5) –

उत्तर

1

मैं ईमानदारी से नहीं सोच सकते हैं मौजूदा ऑपरेटरों के आधार पर समाधान का जो अज्ञात आदेश के गर्म स्रोतों के लिए काम करता है (यानी, xs before ys बनाम ys before xs)। आपका समाधान (, हे अगर यह काम करता है) ठीक लग रहा है, लेकिन मैं कुछ परिवर्तन करने हैं तो यह मेरे कोड थे:

  • समर्थन रद्द ठीक से चयनकर्ता से फेंका अपवाद के लिए MutableDisposable और CompositeDisposable
  • कॉल OnError का उपयोग कर
  • पूरा होने समर्थन करता है, तो यह संभव है एक स्रोत के लिए करने से पहले अन्य

नीचे कोड अपने दोहरे रेंज इनपुट के साथ परीक्षण किया गया है, रों पूरा करने के लिए विचार करें (इसे और अधिक अन्य ऑपरेटरों के साथ लगातार बना) एएमई इनपुट और साथ ही Empty<int> + Never<int> साथ के रूप में, फ़्लिप:

public static IObservable<string> MergeJoin(
    IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
{ 
    return Observable.CreateWithDisposable<string>(o => 
    { 
     Queue<int> a = new Queue<int>(); 
     Queue<int> b = new Queue<int>(); 
     object gate = new object(); 

     bool leftComplete = false; 
     bool rightComplete = false; 

     MutableDisposable leftSubscription = new MutableDisposable(); 
     MutableDisposable rightSubscription = new MutableDisposable(); 

     Action tryDequeue =() => 
     { 
      lock (gate) 
      { 
       while (a.Count != 0 && b.Count != 0) 
       { 
        if (a.Peek() == b.Peek()) 
        { 
         string value = null; 

         try 
         { 
          value = selector(a.Dequeue(), b.Dequeue()); 
         } 
         catch (Exception ex) 
         { 
          o.OnError(ex); 
          return; 
         } 

         o.OnNext(value); 
        } 
        else if (a.Peek() < b.Peek()) 
        { 
         a.Dequeue(); 
        } 
        else 
        { 
         b.Dequeue(); 
        } 
       } 
      } 
     }; 

     leftSubscription.Disposable = left.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (a.Count == 0 || a.Peek() < x) 
        a.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      leftComplete = true; 

      if (a.Count == 0 || rightComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     rightSubscription.Disposable = right.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (b.Count == 0 || b.Peek() < x) 
        b.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      rightComplete = true; 

      if (b.Count == 0 || leftComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     return new CompositeDisposable(leftSubscription, rightSubscription); 
    }); 
} 
3

GroupBy आपको जो चाहिए वह कर सकता है। ऐसा लगता है कि जब आइटम "जुड़ जाते हैं" पर आपके पास कोई समय बाधा नहीं होती है, तो आपको कुछ फैशन में एक साथ रहने के लिए समान वस्तुओं की आवश्यकता होती है।

Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15)) 
.GroupBy(k => k) 
.Subscribe(go => go.Count().Where(cnt => cnt > 1) 
          .Subscribe(cnt => 
        Console.WriteLine("Key {0} has {1} matches", go.Key, cnt))); 

दो से ऊपर के बारे में नोट करने के लिए चीजें हैं, मर्ज, निम्नलिखित भार के है ताकि शामिल हो गए धाराओं के सैकड़ों करने के लिए अपने अनुरोध एक मुद्दा पेश नहीं होंगे:

Merge<TSource>(params IObservable<TSource>[] sources); 
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources); 
Merge<TSource>(this IObservable<IObservable<TSource>> source); 

इसके अलावा, GroupBy रिटर्न IObservable<IGroupedObservable<TKey, TSource>> जिसका अर्थ है कि आप प्रत्येक समूह पर प्रतिक्रिया दे सकते हैं, और प्रत्येक समूह के प्रत्येक नए सदस्य के रूप में आने पर - सभी पूर्ण होने तक प्रतीक्षा करने की आवश्यकता नहीं है।

+0

एकमात्र समस्या यह है कि मुझे क्रम में मूल्यों में शामिल होने में सक्षम होना चाहिए। हालांकि अगर मैं int के बजाय इंडेक्स-वैल्यू टुपल्स पास करता हूं तो इसे हल किया जा सकता है। –

+0

"क्रम में" से आपका क्या मतलब है? –

+0

ध्यान रखें कि 'मर्ज' +' गणना 'का उपयोग करके, आपको कोई स्रोत अनुक्रम समाप्त होने तक कोई मिलान नहीं मिलेगा। उदाहरण 'रेंज' उदाहरण के लिए यह ठीक है, लेकिन यदि आपके स्रोत आउटपुट को गर्म/अनदेखा कर रहे हैं तो आप अपेक्षा नहीं कर सकते हैं। –

1

कैसे v.2838 में नए शामिल हों ऑपरेटर का उपयोग के बारे में।

var a = Observable.Range(1, 10); 
var b = Observable.Range(5, 10); 

var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), 
    (aOutput, bOutput) => new Tuple<int, int>(aOutput, bOutput)) 
    .Where(tupple => tupple.Item1 == tupple.Item2); 

joinedStream.Subscribe(output => Trace.WriteLine(output)); 

यह Join पर मेरी पहली नज़र है और मुझे यकीन है कि अगर यह इस तरह Never ऑपरेटर का उपयोग करने के लिए बुद्धिमान हो जाएगा नहीं कर रहा हूँ। इनपुट की एक बड़ी मात्रा से निपटने के दौरान, क्योंकि यह एक बड़ी राशि के संचालन को कम करेगा, अधिक इनपुट पुन: प्राप्त किए गए थे। मुझे लगता है कि खिड़कियों को बंद करने के लिए काम किया जा सकता है क्योंकि मैचों को बनाया जाता है और समाधान को और अधिक कुशल बनाते हैं। उस ने कहा कि ऊपर दिया गया उदाहरण आपके प्रश्न के अनुसार काम करता है।

रिकॉर्ड के लिए मुझे लगता है कि स्कॉट की जवाब शायद इस उदाहरण में जाने का रास्ता है। मैं इसे एक संभावित विकल्प के रूप में फेंक रहा हूं।

+0

+1 शामिल होने के साथ मैंने कल एक घंटे बिताया और इसे काम नहीं कर सका। मैं प्रदर्शन के बारे में आपकी चिंताओं को साझा करता हूं। इसके अलावा, परिणामी कोड सरल LINQ जुड़ने की तुलना में अधिक गूढ़ और अनुपालन करना मुश्किल है। मुझे लगता है कि आरएक्स इस प्रकार की समस्याओं के लिए एक अच्छा समाधान नहीं है। –

+0

@ सर्गर - मुझे यकीन है कि यह अवधि मूल्यों को उत्सर्जित करके अधिक कुशल बनाया जा सकता है क्योंकि मैचों को बनाया जाता है (यानी अवलोकन करने योग्य। कुछ भी ज्यादा बुद्धिमान के साथ)। यह सब उस अवधि पर निर्भर करेगा जब आप नियमों को पूरा करना सुरक्षित रखते हैं। –

2

इस उत्तर, Rx forums से नकल सिर्फ इतना है कि यह यहाँ में संग्रहीत किया जाएगा और साथ ही है:

var xs = Observable.Range(1, 10); 
var ys = Observable.Range(5, 10); 

var joined = from x in xs 
    from y in ys 
    where x == y 
    select x + "-" + y; 

या क्वेरी भाव का उपयोग किए बिना:

var joined = 
    xs.SelectMany(x => ys, (x, y) => new {x, y}) 
    .Where(t => t.x == t.y) 
    .Select(t => t.x + "-" + t.y); 
+2

इस समाधान के साथ एकमात्र समस्या यह है कि यह आवश्यक है कि 'ys' गर्म (या' मल्टीकास्ट') है और उस परिदृश्य का समर्थन नहीं करता जहां 'ys' मान' xs' मान से पहले हो जाता है। –

संबंधित मुद्दे