2015-06-03 2 views
13

को देखते हुए:दो सॉर्ट किए गए ऑब्जर्वेबल को एक क्रमबद्ध अवलोकन में कैसे विलय करें?

Integer[] arr1 = {1, 5, 9, 17}; 
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15}; 
Observable<Integer> o1 = Observable.from(arr1); 
Observable<Integer> o2 = Observable.from(arr2); 

कैसे एक नमूदार कि 1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17 शामिल पाने के लिए?

+1

बहुत दिलचस्प सवाल है, मुझे यह पसंद है देख सकते हैं। :) –

उत्तर

4

संपादित करें: यदि आप इसका उपयोग करने जा रहे हैं तो कृपया _joric की टिप्पणी देखें। एक एज केस है जिसे संभाला नहीं जाता है, मुझे इसे ठीक करने का एक त्वरित तरीका नहीं दिख रहा है, और इसलिए मेरे पास अभी इसे ठीक करने का समय नहीं है।

यहां सी # में एक समाधान है, क्योंकि आपके पास system.reactive टैग है।

static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b) 
{ 
    var source = Observable.Merge(
     a.Select(x => Tuple.Create('a', x)), 
     b.Select(y => Tuple.Create('b', y))); 
    return source.Publish(o => 
    { 
     var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2); 
     var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2); 
     return Observable.Merge(
      published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)), 
      published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x))); 
    }); 
} 

विचार निम्नानुसार संक्षेप में है।

  • a मूल्य x का उत्सर्जन करता है, तो हम उसे देरी जब तक b एक मूल्य y ऐसी है कि x <= y उत्सर्जन करता है।

  • b मूल्य y का उत्सर्जन करता है, तो हम उसे देरी जब तक a एक मूल्य x ऐसी है कि y <= x उत्सर्जन करता है।

यदि आपके पास केवल गर्म अवलोकन हैं, तो आप निम्न कार्य कर सकते हैं। लेकिन मिश्रण में कोई ठंडा अवलोकन करने पर निम्नलिखित काम नहीं करेंगे। मैं हमेशा उस संस्करण का उपयोग करने की सलाह दूंगा जो गर्म और ठंडे अवलोकन दोनों के लिए काम करता है।

static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b) 
{ 
    return Observable.Merge(
     a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)), 
     b.Delay(y => a.FirstOrDefaultAsync(x => y <= x))); 
} 
+1

यह प्रतीत नहीं होता है जब 'Observable.Create' का उपयोग करके अवलोकनों में से एक बनाया जा रहा है। मैंने उदाहरण के लिए उदाहरण दिया है: https://gist.github.com/skalinets/89f21662a619f685bd6a –

+1

@the_joric हां ऐसा लगता है कि यह काम नहीं करता है जब एक अवलोकन करने योग्य दूसरे के सामने भी समाप्त हो जाता है। मेरे पास अब इसे ठीक करने का समय नहीं है, हालांकि, मैं बस आपकी टिप्पणी को इंगित करने जा रहा हूं। –

+0

इस 'देरी' अधिभार के बारे में नहीं पता था, धन्यवाद! – ionoy

3

आप प्रकार मर्ज कर सकते हैं दृश्यों समतल, लेकिन यह एक महत्वपूर्ण ओवरहेड होगा:

o1.mergeWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...) 

या

o1.concatWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...) 

अन्यथा, आप एक काफी जटिल ऑपरेटर लिखने की ज़रूरत ।

संपादित करें 2015/04/06:

Here is एक ऑपरेटर है कि इस आधार पर छाँटे मर्ज करता है और अधिक कुशलता से।

1

इस समय कुछ समय पहले RxJava mailing list पर चर्चा की गई थी, आपको उस धागे में संभावित समाधानों के कुछ लिंक मिलेंगे।

3

मैं एक मर्ज सॉर्ट समाधान की भी तलाश कर रहा था जो बैकप्रेसर का समर्थन करता है और उसे नहीं मिला। इसलिए मौजूदा ज़िप ऑपरेटर के आधार पर इसे अपने आप कम से कम लागू करने का फैसला किया।

इसी ज़िप करने के लिए, हल कर मर्ज ऑपरेटर प्रत्येक स्रोत नमूदार पहले से एक आइटम एकत्र करता है, लेकिन फिर उन्हें एक प्राथमिकता कतार, जिसमें से उन्हें एक एक करके उनके प्राकृतिक आदेश या निर्दिष्ट तुलनित्र के अनुसार उत्सर्जन करता है में डालता है।

आप लाइब्रेरी का उपयोग करने के लिए तैयार के रूप में GitHub से प्राप्त कर सकते हैं या बस कॉपी/पेस्ट कोड: उपयोग के लिए

https://github.com/ybayk/rxjava-recipes

देखें इकाई परीक्षण।

+0

आपके रीडमे में यह कहता है "आप सॉर्ट को बहुत बड़े या अनंत क्रमबद्ध अनुक्रमों में विलय कर सकते हैं"। मुझे लगता है कि यह अन्य समाधानों पर मुख्य लाभ है। आपको शायद इसे हाइलाइट करना चाहिए। – Luciano

0

बस विलय और क्रमबद्ध करने के बारे में क्या?

@Test 
public void testMergeChains() { 
    Observable.merge(Observable.from(Arrays.asList(1, 2, 13, 11, 5)), Observable.from(Arrays.asList(10, 4, 12, 3, 14, 15))) 
       .collect(ArrayList<Integer>::new, ArrayList::add) 
      .doOnNext(Collections::sort) 
      .subscribe(System.out::println); 

} 

आप अधिक उदाहरण यहाँ

https://github.com/politrons/reactive

+0

यह समाधान काम करता है लेकिन दुर्भाग्य से डेटा की एक बड़ी मात्रा के मामले में अक्षम है: इसे सभी धाराओं को उपभोग करने और सॉर्ट() के कारण स्मृति में संग्रहीत करने की आवश्यकता होती है। – Christophe

संबंधित मुद्दे