var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
var zip = a.Zip(b, (x, y) => x + "-" + y);
zip.Subscribe(Console.WriteLine);
प्रिंटों
0 - 5
1 - 6
2 - 5
6 - - 6 ...एकाधिक IObservable अनुक्रमों में कैसे शामिल हों?
इसके बजाय, मैं समान मूल्यों
5 में शामिल होने के लिए करना चाहते हैं 7 - 7
8 - 8
...
यह आदेश का एक सरलीकृत उदाहरण है जो क्रमबद्ध एसिंक्रोनस अनुक्रमों के 100s विलय कर रहा है। दो आईनेमेरेबल में शामिल होना बहुत आसान है, लेकिन मुझे आरएक्स में ऐसा कुछ करने का कोई तरीका नहीं मिला। कोई विचार?
इनपुट और जो मैं प्राप्त करने की कोशिश कर रहा हूं, उसके बारे में अधिक जानकारी। असल में, पूरी प्रणाली फोर्क-जॉइन पैटर्न से जुड़े कई राज्य मशीनों (एग्रीगेटर, बफर, स्मूथिंग फ़िल्टर इत्यादि) के साथ एक वास्तविक समय पाइपलाइन है। क्या आरएक्स ऐसी चीजों को लागू करने के लिए एक अच्छा फिट है? प्रत्येक इनपुट
public struct DataPoint
{
public double Value;
public DateTimeOffset Timestamp;
}
डेटा के प्रत्येक इनपुट बिट आगमन पर टाइमस्टैंप है के रूप में प्रतिनिधित्व किया जा सकता है, इस प्रकार सभी घटनाओं स्वाभाविक रूप से उनके शामिल होने कुंजी (टाइमस्टैम्प) द्वारा आदेश दिया गया है। जैसे-जैसे घटनाएं पाइपलाइन के माध्यम से यात्रा करती हैं, वे फोर्क हो जाते हैं और जुड़ जाते हैं। जुड़ने के लिए टाइमस्टैम्प से सहसंबंधित होना आवश्यक है और पूर्वनिर्धारित क्रम में लागू किया जाना चाहिए। उदाहरण के लिए, शामिल हों (ए, बी, सी, डी) => शामिल हों (शामिल हों (शामिल हों (ए, बी), सी), डी)।
संपादित नीचे क्या मैं जल्दी में साथ आने कर सकता है। उम्मीद है कि मौजूदा आरएक्स ऑपरेटरों के आधार पर एक आसान समाधान है।
static void Test()
{
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
//var zip = a.Zip(b, (x, y) => x + "-" + y);
//zip.Subscribe(Console.WriteLine);
var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
joined.Subscribe(Console.WriteLine);
}
static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();
left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
return Disposable.Empty;
});
[rx मंच] पर एक ही सवाल (पूछे जाने वाले http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963- 0c83-4968-a1b2-1317d5e31ae5) –