2012-11-27 25 views
7

मेरे पास एक प्रोग्राम है जहां मैं ईवेंट प्राप्त कर रहा हूं और बैचों में उन्हें संसाधित करना चाहता हूं, ताकि जब मैं वर्तमान बैच को संसाधित कर रहा हूं, तब तक आने वाले सभी आइटम अगले बैच में दिखाई देंगे ।प्रतिक्रियाशील एक्सटेंशन: बफर जब तक कि ग्राहक निष्क्रिय नहीं होता

आरएक्स में सरल टाइमस्पैन और गिनती आधारित बफर विधियां मुझे उन सभी चीजों का एक बड़ा बैच देने की बजाए आइटमों के कई बैच देगी, जो मामले में आती हैं (ऐसे मामलों में जब ग्राहक निर्दिष्ट टाइमस्पैन से अधिक समय लेता है या एन से अधिक आइटम आते हैं और एन गिनती से बड़ा है)।

मैंने Func<IObservable<TBufferClosing>> या IObservable<TBufferOpening> and Func<TBufferOpening, IObservable<TBufferClosing>> लेते हुए अधिक जटिल बफर ओवरलोड का उपयोग करने पर ध्यान दिया, लेकिन मुझे इनका उपयोग करने के उदाहरणों के बारे में उदाहरण नहीं मिल पा रहे हैं, जो मुझे करने की कोशिश कर रहे हैं, उन्हें लागू करने के तरीके के बारे में बहुत कम आंकड़े नहीं हैं।

+0

[यह पृष्ठ] (http://leecampbell.blogspot.com.au/2011/03/rx-part-9join-window-buffer-and-group.html) बफर ओवरलोड के साथ मदद कर सकता है। पूरी श्रृंखला बहुत उपयोगी है –

+0

क्या आपने टीपीएल डेटाफ्लो में 'बफरब्लॉक' की कोशिश की है? – Asti

उत्तर

1

क्या यह वही है जो आप चाहते हैं?

var xs = new Subject<int>(); 
var ys = new Subject<Unit>(); 

var zss = 
    xs.Buffer(ys); 

zss 
    .ObserveOn(Scheduler.Default) 
    .Subscribe(zs => 
    { 
     Thread.Sleep(1000); 
     Console.WriteLine(String.Join("-", zs)); 
     ys.OnNext(Unit.Default); 
    }); 

ys.OnNext(Unit.Default); 
xs.OnNext(1); 
Thread.Sleep(200); 
xs.OnNext(2); 
Thread.Sleep(600); 
xs.OnNext(3); 
Thread.Sleep(400); 
xs.OnNext(4); 
Thread.Sleep(300); 
xs.OnNext(5); 
Thread.Sleep(900); 
xs.OnNext(6); 
Thread.Sleep(100); 
xs.OnNext(7); 
Thread.Sleep(1000); 

मेरे परिणाम:

1-2-3 
4-5 
6-7 
+0

एक थ्रेड पर प्रोग्राम निष्पादित करने के लिए ObserveOn को हटाने के कारण इसे तोड़ने का कारण बनता है। –

+0

@ChrisEldredge - हाँ यह करता है। आपको इसे काम करने के लिए बहु-थ्रेडिंग की अनुमति देनी पड़ सकती है। – Enigmativity

+0

कतार खाली होने पर यह एक व्यस्त प्रतीक्षा भी करेगा (100% पर एक सीपीयू चिपकाता है)। यदि आप zs.Count == 0 पर नींद को हटाते हैं, तो आप स्पाइक देखेंगे। –

1

आपको क्या करना होगा मूल्यों बफ़र होना और फिर जब कार्यकर्ता यह वर्तमान बफर के लिए पूछता है के लिए तैयार है कुछ है और फिर इसे रीसेट करता है। यह अपने कार्यकर्ता में

var tictac = new TicTac<double>(); 

IObservable<double> source = .... 

source.Subscribe(x=>tictac.Push(x)); 

फिर

class TicTac<Stuff> { 

    private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>(); 

    List<Stuff> in = new List<Stuff>(); 

    public void push(Stuff stuff){ 
     lock(this){ 
      if(in == null){ 
       in = new List<Stuff>(); 
       Items.SetResult(in); 
      } 
      in.Add(stuff); 
     } 
    } 

    private void reset(){ 
     lock(this){ 
      Items = new TaskCompletionSource<List<Stuff>>(); 
      in = null; 
     } 
    } 

    public async Task<List<Stuff>> Items(){ 
     List<Stuff> list = await Items.Task; 
     reset(); 
     return list; 
    } 
} 

तो RX और कार्य के संयोजन के साथ किया जा सकता है

while(true){ 

    var items = await tictac.Items(); 

    Thread.Sleep(100); 

    for each (item in items){ 
     Console.WriteLine(item); 
    } 

} 
+1

मुझे लगता है कि यह काम करता है, लेकिन अगर मुझे सीधे टीपीएल/एपीएम के साथ कोड करना है तो प्रतिक्रियाशील एक्सटेंशन का उपयोग क्यों करें? –

+0

प्रतिक्रियाशील एक्सटेंशन एक ढांचा है। आप इसे अपनी इच्छानुसार विस्तारित कर सकते हैं। मैं अक्सर कस्टम सामान के लिए अपने स्वयं के ऑपरेटर जोड़ता हूं – bradgonesurfing

+2

@ChrisEldredge निष्पक्ष होने के लिए, आप जो पूछ रहे हैं वह आरएक्स ढांचे के लिए उपयुक्त नहीं है; ग्राहक के पास पर्यवेक्षक को सिग्नल करने का कोई तरीका नहीं है कि यह निष्क्रिय है या नहीं, आपको उस बैंड से बाहर करना है। – casperOne

1

तरह से मैं पहले यह किया है ऊपर खींचने के लिए है डॉटपीक/परावर्तक में विधि का निरीक्षण करें और उस कतार की अवधारणा को लें जो इसे हमारे आवश्यकताओं के अनुरूप है और अनुकूलित करता है। उदाहरण के लिए, तेजी से टिकने वाले डेटा (जैसे वित्त) के साथ यूआई अनुप्रयोगों में यूआई थ्रेड घटनाओं के साथ बाढ़ आ सकती है और कभी-कभी यह पर्याप्त तेज़ी से अद्यतन नहीं कर सकती है। इन मामलों में हम अंतिम घटना को छोड़कर सभी घटनाओं को छोड़ना चाहते हैं (किसी विशेष उपकरण के लिए)। इस मामले में हमने ObserveOn की आंतरिक कतार को टी के एक ही मान में बदल दिया (ObserveLatestOn (IScheduler) के लिए देखो)। आपके मामले में आप कतार चाहते हैं, हालांकि आप पूरी कतार को केवल पहले मान को धक्का नहीं देना चाहते हैं। इससे आप कार्य शुरू कर पाएंगे।

संबंधित मुद्दे