2012-07-25 21 views
12

Throttle विधि एक अवलोकन अनुक्रम से मूल्य छोड़ देता है अगर अन्य बहुत जल्दी अनुसरण करते हैं। लेकिन मुझे बस देरी करने के लिए एक विधि की जरूरत है। यही है, मुझे छोड़ने के बिना आइटमों के बीच न्यूनतम देरी सेट करने की आवश्यकता है।थ्रॉटल Rx.Observable मूल्यों को छोड़ दिए बिना

प्रैक्टिकल उदाहरण: एक वेब सेवा है जो अनुरोधों को दूसरे से एक बार तेज नहीं कर सकती है; एक ऐसा उपयोगकर्ता है जो अनुरोध, एकल या बैचों में जोड़ सकता है। आरएक्स के बिना, मैं एक सूची और एक टाइमर बनाउंगा। जब उपयोगकर्ता अनुरोध जोड़ते हैं, तो मैं उन्हें सूची में जोड़ दूंगा। टाइमर घटना में, मैं गीलेर की जांच करूंगा कि सूची खाली है। यदि ऐसा नहीं है, तो मैं एक अनुरोध भेजूंगा और संबंधित आइटम को हटा दूंगा। ताले और सभी चीजों के साथ। अब, आरएक्स के साथ, मैं Subject बना सकता हूं, जब उपयोगकर्ता अनुरोध जोड़ते हैं तो आइटम जोड़ें। लेकिन मुझे यह सुनिश्चित करने का एक तरीका चाहिए कि देरी लागू करके वेब सेवा बाढ़ न हो।

मैं आरएक्स के लिए नया हूं, तो शायद मुझे कुछ स्पष्ट याद आ रही है।

उत्तर

7

वहाँ एक काफी आसान तरीका क्या आप एक EventLoopScheduler का उपयोग कर करना चाहते हैं नहीं है।

मैंने एक अवलोकन के साथ शुरुआत की जो हर 0 से 3 सेकंड में यादृच्छिक रूप से मूल्यों का उत्पादन करेगी।

var rnd = new Random(); 

var xs = 
    Observable 
     .Generate(
      0, 
      x => x < 20, 
      x => x + 1, 
      x => x, 
      x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0)); 
अब

, इस उत्पादन मूल्यों बनाने के लिए तुरंत जब तक अंतिम मान के भीतर दूसरी पहले मैं ऐसा किया था:

var ys = 
    Observable.Create<int>(o => 
    { 
     var els = new EventLoopScheduler(); 
     return xs 
      .ObserveOn(els) 
      .Do(x => els.Schedule(() => Thread.Sleep(1000))) 
      .Subscribe(o); 
    }); 

यह प्रभावी रूप से EventLoopScheduler पर स्रोत का अवलोकन करता है और फिर कहते हैं के लिए सोने के लिए प्रत्येक OnNext के बाद 1 सेकंड ताकि यह जागने के बाद ही अगले OnNext शुरू कर सके।

मैं परीक्षण किया है कि यह इस कोड के साथ काम किया:

ys 
    .Timestamp() 
    .Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0) 
    .Subscribe(x => Console.WriteLine(x)); 

मुझे आशा है कि इस मदद करता है।

+0

'थ्रेड नहीं है। सो जाओ 'बुरा? मैंने हमेशा उन चीजों के लिए धागा निलंबित करने का सोचा जो अनिवार्य रूप से "टाइमर" संसाधनों का अपशिष्ट है। –

+0

@ वारवरकालिनीना - यह अपने स्वयं के धागे पर हो रहा है और यह किसी और चीज को डेडलॉक नहीं कर रहा है। इस मामले में यह ठीक है। – Enigmativity

+0

मैंने यह नहीं कहा कि यह किसी चीज़ को अवरुद्ध करने या अवरुद्ध करने के बारे में था। यह संसाधन अपशिष्ट के बारे में था। आप एक थ्रेड मजबूती से लगभग हर समय कुछ भी नहीं करते हैं। यह अन्य चीजें कर सकता है अगर इसे –

0

अवरुद्ध कतार से लेने के लिए एक अवलोकन करने योग्य टाइमर का उपयोग करने के बारे में कैसे? नीचे दिए गए कोड अपरीक्षित है, लेकिन आप मुझे क्या मतलब है की एक विचार देना चाहिए ...

//assuming somewhere there is 
BlockingCollection<MyWebServiceRequestData> workQueue = ... 

Observable 
    .Timer(new TimeSpan(0,0,1), new EventLoopScheduler()) 
    .Do(i => myWebService.Send(workQueue.Take())); 

// Then just add items to the queue using workQueue.Add(...) 
+0

यह समाधान शायद काम करते हैं और सवाल से "rx से पहले" समाधान के समान है जाएगा, लेकिन वहाँ एक दोष: अनुरोध तत्काल नहीं भेजे जाएंगे, यहां तक ​​कि यह संभव है; केवल जब टाइमर टिकता है। 1 सेकंड देरी के साथ यह बहुत महत्वपूर्ण नहीं है, लेकिन यदि अनुरोधों के बीच आवश्यक देरी होती है, उदाहरण के लिए, 1 मिनट, तो यह एक मुद्दा है। – Athari

+0

@Athari सच नहीं है। पहला टाइमर टिक टेक() पर अवरुद्ध होगा, और उस पल को निष्पादित करेगा जिसे एक आइटम दबाया गया है। आप मेरा मतलब क्या देखने के लिए टाइमआउट को 1 मिनट तक सेट करने का प्रयास कर सकते हैं। –

+0

मैंने कोशिश की है और वास्तव में एक और समस्या है: कभी-कभी दो आइटम एक देरी के बिना एक के बाद संसाधित होते हैं। ऐसा लगता है कि ऐसा होता है क्योंकि, टेक पर इंतजार करते समय, एक और टाइमर इवेंट कतारबद्ध होता है, बिना किसी पिछले प्रसंस्करण को पूरा करने के इंतजार के। मेरा कोड: http://pastebin.com/RRZ0ffBB – Athari

0

एक बफर का उपयोग करने पर विचार करें।

http://msdn.microsoft.com/en-us/library/hh212130(v=vs.103).aspx

नाम इंगित करता है, यह "गिर" उनमें से किसी के बिना ईवेंट के प्रत्यक्ष धारा में तत्वों बफ़र्स।

जी

+2

अवरुद्ध नहीं किया गया था 'बफर' प्रत्येक पुनरावृत्ति पर एक सूची तैयार करता है, लेकिन मुझे एक ही आइटम की आवश्यकता है। प्रश्न से उदाहरण के संदर्भ में, वेब सेवा बैच अनुरोधों का समर्थन नहीं करती है। – Athari

2

कैसे एक सरल विस्तार विधि के बारे में:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) 
{ 
    return source.Select(x => 
     Observable.Empty<T>() 
      .Delay(minDelay) 
      .StartWith(x) 
    ).Concat(); 
} 

उपयोग:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1)); 
0
.Buffer(TimeSpan.FromSeconds(0.2)).Where(i => i.Any()) 
.Subscribe(buffer => 
{ 
    foreach(var item in buffer) Console.WriteLine(item) 
}); 
+0

यहां एक ही समस्या है: जब आपको बिल्कुल 1 आइटम की आवश्यकता होती है, तो कुछ अनुक्रम लौटने का ओवरहेड (प्रतीक्षा करें और फिर अगला आइटम लौटाएं, और इसी तरह) – abatishchev

3

मैं Observable.Zip उपयोग करने के साथ एक दृष्टिकोण का सुझाव देना चाहते:

// Incoming requests 
var requests = new[] {1, 2, 3, 4, 5}.ToObservable(); 

// defines the frequency of the incoming requests 
// This is the way to emulate flood of incoming requests. 
// Which, by the way, uses the same approach that will be used in the solution 
var requestsTimer = Observable.Interval(TimeSpan.FromSeconds(0.1)); 
var incomingRequests = Observable.Zip(requests, requestsTimer, (number, time) => {return number;}); 
incomingRequests.Subscribe((number) => 
{ 
    Console.WriteLine($"Request received: {number}"); 
}); 

// This the minimum interval at which we want to process the incoming requests 
var processingTimeInterval = Observable.Interval(TimeSpan.FromSeconds(1)); 

// Zipping incoming requests with the interval 
var requestsToProcess = Observable.Zip(incomingRequests, processingTimeInterval, (data, time) => {return data;}); 

requestsToProcess.Subscribe((number) => 
{ 
    Console.WriteLine($"Request processed: {number}"); 
}); 
+0

मैं आरएक्सजे के लिए एक समाधान की तलाश में था और यह अभी भी वहां काम करता था । आपका बहुत बहुत धन्यवाद! :) – Sammi

0

मैं इसके साथ खेल रहा था और पाया।पिन (जैसा कि पहले उल्लेख किया है) सबसे सरल विधि के लिए:

var stream = "ThisFastObservable".ToObservable(); 
var slowStream = 
    stream.Zip(
     Observable.Interval(TimeSpan.FromSeconds(1)), //Time delay 
     (x, y) => x); // We just care about the original stream value (x), not the interval ticks (y) 

slowStream.TimeInterval().Subscribe(x => Console.WriteLine($"{x.Value} arrived after {x.Interval}")); 

उत्पादन:

T arrived after 00:00:01.0393840 
h arrived after 00:00:00.9787150 
i arrived after 00:00:01.0080400 
s arrived after 00:00:00.9963000 
F arrived after 00:00:01.0002530 
a arrived after 00:00:01.0003770 
s arrived after 00:00:00.9963710 
t arrived after 00:00:01.0026450 
O arrived after 00:00:00.9995360 
b arrived after 00:00:01.0014620 
s arrived after 00:00:00.9993100 
e arrived after 00:00:00.9972710 
r arrived after 00:00:01.0001240 
v arrived after 00:00:01.0016600 
a arrived after 00:00:00.9981140 
b arrived after 00:00:01.0033980 
l arrived after 00:00:00.9992570 
e arrived after 00:00:01.0003520 
संबंधित मुद्दे