2012-11-19 32 views
9

मैं एक सी # (.NET 4.5) एप्लिकेशन लिख रहा हूं जिसका उपयोग रिपोर्टिंग उद्देश्यों के लिए समय आधारित घटनाओं को एकत्रित करने के लिए किया जाता है। रीयलटाइम और ऐतिहासिक डेटा दोनों के लिए मेरी क्वेरी तर्क पुन: प्रयोज्य बनाने के लिए मैं प्रतिक्रियाशील एक्सटेंशन (2.0) और उनके IScheduler आधारभूत संरचना (HistoricalScheduler और दोस्तों) का उपयोग करता हूं।Observable.Generate() सिस्टम क्यों फेंक देता है। StackOverflowException?

उदाहरण के लिए, मान लेते हैं हम घटनाओं की एक सूची बना (कालक्रम के अनुसार छाँटे गए, पर वे समान हो सकते हैं!) जिसका केवल पेलोड उनका टाइमस्टैम्प IST और एक निश्चित अवधि के बफ़र्स भर में उनके वितरण जानना चाहता हूँ:

const int num = 100000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 

var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

var stream = Observable.Generate<int, DateTimeOffset>(
    0, 
    s => s < events.Count, 
    s => s + 1, 
    s => events[s], 
    s => events[s], 
    time); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 

निम्नलिखित स्टैक ट्रेस के साथ एक System.StackOverflowException में इस कोड परिणाम चल रहा है (पिछले 3 लाइनों सभी तरह है यह नीचे):

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 

ठीक है, समस्या Observable.Generate() के अपने प्रयोग से आने के लिए सूची के आधार पर लगता है आकार (num) और शेड्यूलर की पसंद के बावजूद।

मैं क्या गलत कर रहा हूं? या अधिक आम तौर पर, IObservable को IEnumerable से अपने स्वयं के टाइमस्टैम्प प्रदान करने का पसंदीदा तरीका क्या होगा?

+1

इस त्रुटि का सामना करने से पहले कितना बड़ा हो सकता है? साथ ही, यदि आप डीबगर में सिंगल-स्टेप करते हैं, तो त्रुटि देखने से पहले निष्पादित कोड की आखिरी पंक्ति क्या है? –

+0

मेरे लिए, महत्वपूर्ण दहलीज ~ 'num = 51600' (रिलीज कॉन्फ़िगरेशन में, डीबग कॉन्फ़िगरेशन में थोड़ा कम) पर प्रतीत होता है। अवलोकन अनुक्रम पूरी तरह से बनाया गया प्रतीत होता है। मैं 'Observable.Generate()' के लिए lamdba अभिव्यक्तियों पर ब्रेकपॉइंट्स हिट कर सकता हूं। 'Console.WriteLine()' के अंतिम कॉल के बाद अपवाद फेंक दिया जाता है। –

+1

समझें, यह सिर्फ एक अनुमान है, लेकिन यह संदिग्ध रूप से लगता है कि धारा प्रत्येक तत्व का निपटान करने की कोशिश कर रही है, और प्रत्येक तत्व धारा का निपटान करने का प्रयास कर रहा है। आप 'रद्द करें' या 'निपटान' के लिए अनिवार्य रूप से रिकर्सिव कॉल के साथ समाप्त होते हैं, जो आपके स्टैक को उड़ाता है (जिसका डिफ़ॉल्ट आकार 1 मेगाबाइट है)। मैं यह कहने के लिए 'अवलोकनयोग्य' के साथ पर्याप्त परिचित नहीं हूं कि यह क्यों हो रहा है। –

उत्तर

3

(अद्यतन - महसूस किया कि मैं एक विकल्प प्रदान नहीं किया: पर देख उत्तर के नीचे)

समस्या यह है कि Observable.Generate काम करता है - इसका उपयोग तर्कों के आधार पर corecursive (लगता है कि रिकर्सन अंदरूनी) जनरेटर को प्रकट करने के लिए उपयोग किया जाता है; यदि वे तर्क बहुत नेस्टेड कोरकर्सिव जनरेटर उत्पन्न करते हैं, तो आप अपना ढेर उड़ा देंगे।

इस बिंदु से मैं इस बात से बहुत कुछ अनुमान लगा रहा हूं (मेरे सामने आरएक्स स्रोत नहीं है) (नीचे देखें), लेकिन मैं आपकी परिभाषा को शर्त लगाने के लिए तैयार हूं जैसे कुछ :

initial_state => 
generate_next(initial_state) => 
generate_next(generate_next(initial_state)) => 
generate_next(generate_next(generate_next(initial_state))) => 
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ... 

और जब तक आपका कॉल स्टैक ओवरफ़्लो तक पर्याप्त नहीं हो जाता तब तक और आगे।कहें, एक विधि हस्ताक्षर + आपका इंट काउंटर, जो कि प्रति सेकंड 8-16 बाइट्स की तरह कुछ होगा (राज्य मशीन जनरेटर को कैसे लागू किया जाता है, इस पर निर्भर करता है), इसलिए 60,000 सही के बारे में लगता है (1 एम/16 ~ 62500 अधिकतम गहराई)

संपादित करें: स्रोत कर लिया जाता - पुष्टि की: "रन" इस तरह दिखता है उत्पन्न की विधि - Generate को नेस्टेड कॉल को ध्यान में रखना:

protected override IDisposable Run(
    IObserver<TResult> observer, 
    IDisposable cancel, 
    Action<IDisposable> setSink) 
{ 
    if (this._timeSelectorA != null) 
    { 
     Generate<TState, TResult>.α α = 
       new Generate<TState, TResult>.α(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(α); 
     return α.Run(); 
    } 
    if (this._timeSelectorR != null) 
    { 
     Generate<TState, TResult>.δ δ = 
       new Generate<TState, TResult>.δ(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(δ); 
     return δ.Run(); 
    } 
    Generate<TState, TResult>._ _ = 
      new Generate<TState, TResult>._(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
    setSink(_); 
    return _.Run(); 
} 

संपादित करें: Derp, प्रस्ताव नहीं था कोई विकल्प ... यहां एक ऐसा काम कर सकता है जो काम कर सकता है:

(संपादित करें: निश्चित Enumerable.Range, इसलिए स्ट्रीम आकारसे गुणा नहीं किया गया)

const int num = 160000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 
var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

    // Size too big? Fine, we'll chunk it up! 
const int chunkSize = 10000; 
var numberOfChunks = events.Count/chunkSize; 

    // Generate a whole mess of streams based on start/end indices 
var streams = 
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count/chunkSize) - 1) 
    let startIdx = chunkIndex * chunkSize 
    let endIdx = Math.Min(events.Count, startIdx + chunkSize) 
    select Observable.Generate<int, DateTimeOffset>(
     startIdx, 
     s => s < endIdx, 
     s => s + 1, 
     s => events[s], 
     s => events[s], 
     time); 

    // E pluribus streamum 
var stream = Observable.Concat(streams); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 
+0

धन्यवाद, यह सही है! यह भी मेरे अपने कामकाज से अधिक कुशल लगता है। मुझे आपके अंकगणित में एक छोटी सी त्रुटि को ठीक करना पड़ा, हालांकि (संपादन देखें)। मैं अभी भी नहीं देखता कि क्यों आरएक्स के अंदर रिकर्सिव कार्यान्वयन की आवश्यकता है। आखिरकार, ऐसा लगता है कि आरएक्स v1.0 (60,000 के आकार से परे) के साथ काम करता है। फिर भी, अच्छी जांच, चालाक समाधान। एक बार फिर धन्यवाद! –

+0

कोई समस्या नहीं! हे - मैं वास्तव में प्रभावित हूं मैं केवल * एक * गणित त्रुटि था ...;) – JerKimball

3

ठीक है, मैंने एक अलग फैक्ट्री विधि ली है जिसे राज्य संक्रमण के रूप में lamdba अभिव्यक्तियों की आवश्यकता नहीं है और अब मुझे कोई ढेर ओवरफ्लो दिखाई नहीं दे रहा है। मैं अभी तक यकीन नहीं है कि अगर यह मेरे सवाल का एक सही जवाब के रूप में योग्य होगा, लेकिन यह काम करता है और मैं I'd इसे यहाँ साझा सोचा:

var stream = Observable.Create<DateTimeOffset>(o => 
    { 
     foreach (var e in events) 
     { 
      time.Schedule(e,() => o.OnNext(e)); 
     } 

     time.Schedule(events[events.Count - 1],() => o.OnCompleted()); 

     return Disposable.Empty; 
    }); 

मैन्युअल से पहले की घटनाओं का समय निर्धारण सदस्यता लौटने लगता है (!) मेरे लिए अजीब, लेकिन इस मामले में यह लैम्ब्डा अभिव्यक्ति के अंदर किया जा सकता है।

यदि इस दृष्टिकोण के बारे में कुछ भी गलत है, तो कृपया मुझे सही करें। साथ ही, मुझे यह भी सुनकर खुशी होगी कि System.Reactive द्वारा अंतर्निहित धारणाओं के बारे में मैंने अपने मूल कोड का उल्लंघन किया है।

(हे, मैं जाँच की जाना चाहिए था पहले कि: RX v1.0 के साथ, मूल Observable.Generate() है वास्तव में काम करने के लिए लग रहे हैं!)

संबंधित मुद्दे