2012-07-01 16 views
6

मेरे पास आने वाली घटनाओं का एक गुच्छा है और मुझे बिना किसी नुकसान के उन्हें निष्पादित करना है, लेकिन मैं यह सुनिश्चित करना चाहता हूं कि वे उपयुक्त समय स्लॉट पर बफर और उपभोग कर रहे हों। किसी के पास कोई समाधान है?एक अवलोकन के उपभोग करने की "दर सीमा" का सबसे अच्छा तरीका क्या है?

मुझे आरएक्स में कोई भी ऑपरेटर नहीं मिल रहा है जो घटनाओं के नुकसान के बिना ऐसा कर सकता है (थ्रोटल - घटनाओं को खो देता है)। मैंने बुफर्ड, देरी इत्यादि भी माना है ... एक अच्छा समाधान नहीं मिल रहा है।

मैं बीच में एक टाइमर डाल करने की कोशिश की है, लेकिन किसी भी तरह यह बिल्कुल काम नहीं करता:

GetInitSequence() 
      .IntervalThrottle(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime) 
    { 
     return Observable.Create<T>(o => 
      { 
       return source.Subscribe(x => 
        { 
         new Timer(state => 
          o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1)); 
        }, o.OnError, o.OnCompleted); 
     }); 
    } 
+0

आप एक संगमरमर जोड़ सकते हैं नहीं है digram दिखा रहा है कि आपके पास क्या है, और आप क्या चाहते हैं? दूसरों की तरह, मुझे यकीन नहीं है कि आप क्या हासिल करने की कोशिश कर रहे हैं, क्योंकि मुझे लगता है कि बफर ठीक है। –

+0

आप क्या सीमित कर रहे हैं? – Fredrick

उत्तर

10

सवाल नहीं 100% स्पष्ट है तो मैं कुछ अनुमान बना रहा हूं।

Observable.Delay वह नहीं है जो आप चाहते हैं क्योंकि इससे प्रत्येक घटना आने पर देरी होगी, बल्कि प्रसंस्करण के लिए समय अंतराल बनाने के बजाय।

Observable.Buffer वह नहीं है जो आप चाहते हैं क्योंकि इससे एक समय में एक के बजाय प्रत्येक दिए गए अंतराल में सभी घटनाएं आपको पारित की जाएंगी।

तो मेरा मानना ​​है कि आप ऐसे समाधान की तलाश में हैं जो मेट्रोनोम बनाता है जो दूर टिकता है, और आपको प्रति टिक इवेंट देता है। यह भोलेपन से यह अपने स्रोत को जोड़ने के लिए metronome के लिए Observable.Interval और Zip का प्रयोग कर बनाया जा सकता है:

var source = GetInitSequence(); 
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));  
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now)); 

यह हर 5 सेकंड (उपरोक्त उदाहरण में) को गति प्रदान करेगा, और आप अनुक्रम में मूल आइटम दे।

इस समाधान के साथ एकमात्र समस्या यह है कि यदि आपके पास 10 सेकंड के लिए कोई और स्रोत तत्व नहीं है, तो जब स्रोत तत्व आते हैं तो उन्हें तत्काल भेजा जाएगा क्योंकि कुछ 'ट्रिगर' घटनाएं बैठे हैं उनके लिए इंतज़ार कर रहे हैं। उस परिदृश्य के लिए संगमरमर चित्र:

source: -a-b-c----------------------d-e-f-g 
trigger: ----o----o----o----o----o----o----o 
result: ----a----b----c-------------d-e-f-g 

यह एक बहुत ही उचित मुद्दा है।वहाँ दो सवाल यहाँ पहले से ही है कि यह से निपटने के हैं:

Rx IObservable buffering to smooth out bursts of events

A way to push buffered events in even intervals

समाधान प्रदान एक मुख्य Drain विस्तार विधि और माध्यमिक Buffered विस्तार है। मैंने इन्हें बहुत सरल बनाने के लिए संशोधित किया है (Drain की आवश्यकता नहीं है, बस Concat का उपयोग करें)। प्रयोग है:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5)); 

विस्तार विधि StepInterval:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) 
{ 
    return source.Select(x => 
     Observable.Empty<T>() 
      .Delay(minDelay) 
      .StartWith(x) 
    ).Concat(); 
} 
+0

धन्यवाद। अभी भी वास्तव में जो मैं खोज रहा था, लेकिन यह मुझे कुछ विचार दिखाता है। मैं सिर्फ आरएक्स के साथ निराश हूं - यह इतना जटिल क्यों होना चाहिए और उचित दस्तावेज नहीं होना चाहिए। सीखने की अवस्था खड़ी है और कुछ मूल्यवान प्राप्त करने के लिए विषय का विस्तृत ज्ञान आवश्यक है। #fail – IgorM

+1

सहमत हुए। यही कारण है कि मैंने आपकी स्थिति में लोगों की सहायता के लिए IntroToRx.com लिखने में काफी समय लगाया। यह कठिन है, और सीखने के लिए बहुत कुछ है। –

+0

मुझे वास्तव में इन आरएक्स ऑपरेटरों को पढ़ने और तर्क के लिए मुश्किल लगता है। मुझे लगता है कि यह मेरी सीमा है - शायद यह इसलिए है क्योंकि मेरे पास एक दृश्य दिमाग है और मैं परिणाम को कल्पना नहीं कर सकता। क्या इस जवाब में कोड के लिए संगमरमर आरेख प्राप्त करने का कोई मौका है? –

0

के बारे में कैसे Observable.Buffer? यह एक ही घटना के रूप में 1 एस विंडो में सभी घटनाओं को वापस करना चाहिए।

var xs = Observable.Interval(TimeSpan.FromMilliseconds(100)); 
var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5)); 
bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); }); 

यह हो सकता है कि आप जो पूछ रहे हैं वह स्पष्ट नहीं है। आपका कोड क्या करना चाहिए? ऐसा लगता है कि आप प्रत्येक घटना के लिए टाइमर बनाकर बस देरी कर रहे हैं। यह अवलोकन के अर्थशास्त्र को भी तोड़ता है क्योंकि अगले और पूर्ण होने से पहले पूरा हो सकता है।

नोट यह भी टाइमर पर सटीक के रूप में सटीक है। आम तौर पर टाइमर अधिकतम 16ms पर सटीक होते हैं।

संपादित करें:

अपने उदाहरण बन जाता है, और आइटम विंडो में सभी घटनाओं में शामिल हैं:

GetInitSequence() 
      .Buffer(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 
1

मैं जानता हूँ कि यह अभी भी सरल किया जा सकता है, लेकिन यह काम करेंगे?

var intervaled = source.Do(x => { Thread.Sleep(100); }); 

असल में यह मूल्यों के बीच न्यूनतम देरी डालता है। बहुत सरल है?

+0

ओपीएस इंटरवल ट्रॉटल के व्यवहार से यह मेल खाता है और यह ठीक करता है क्या यह वास्तव में समझदार है? –

+1

ईक ... धागे अवरुद्ध !? आरएक्स प्रधानाचार्यों के चेहरे में उस तरह की उड़ानें सही हैं? –

+0

हां, इसमें इसकी शुद्ध अर्थ यह आरएक्स के खिलाफ है, लेकिन आवश्यकता को अवरुद्ध करना है –

1
Enigmativity के जवाब की पंक्तियों के साथ

, अगर सब आप क्या करना चाहते सिर्फ देरी एक TimeSpan द्वारा सभी मान है, मैं देख नहीं कर सकते क्यों Delay ऑपरेटर आप चाहते हैं

GetInitSequence() 
     .Delay(TimeSpan.FromSeconds(5)) //ideally pass an IScheduler here 
     .Subscribe(
      item => 
       { 
        Console.WriteLine(DateTime.Now); 
        // Process item 
       } 
     ); 
संबंधित मुद्दे

 संबंधित मुद्दे