2016-02-03 4 views
11

मैं System.Net.WebSocket के लिए एक एक्सटेंशन विधि लिखने की कोशिश कर रहा हूं जो इसे प्रतिक्रियाशील एक्सटेंशन (Rx.NET) का उपयोग करके IObserver में बदल देगा। आप नीचे दिए गए कोड देख सकते हैं:IObserver को async/onEext/OnError/OnCompleted विधियों का इंतजार कैसे करें?

public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer) 
{ 
    // Wrap the web socket in an interface that's a little easier to manage 
    var webSocketMessageStream = new WebSocketMessageStream(webSocket); 

    // Create the output stream to the client 
    return Observer.Create<T>(
     onNext:  async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)), 
     onError:  async error => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)), 
     onCompleted: async()  => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected") 
    ); 
} 

इस कोड काम करता है, लेकिन मैं async के उपयोग के बारे में चिंतित हूं/onNext, onError के अंदर का इंतजार है, और lambdas onCompleted। मुझे पता है कि यह एसिंक शून्य लैम्ब्डा लौटाता है, जो कि (और कभी-कभी ऐसे मुद्दों का कारण बनता है जिन्हें मैंने पहले ही चलाया है)।

मैं आरएक्स.नेट दस्तावेज के साथ-साथ इंटरनेट पर ब्लॉग पोस्ट पर पढ़ रहा हूं और मुझे IObserver में एसिंक/प्रतीक्षा विधि का उपयोग करने के लिए उचित तरीका (यदि कोई है) नहीं मिल रहा है। क्या ऐसा करने का कोई सही तरीका है? यदि नहीं, तो मैं इस समस्या के आसपास कैसे काम कर सकता हूं?

+0

आपकी धारणाएं गलत हैं। असल में, आप 'Func >' (या कुछ संस्करण जो पैरामीटर लेते हैं) को 'onNext' आदि पर पास कर रहे हैं। प्रतिनिधियों के पास' शून्य 'रिटर्न प्रकार नहीं हैं। वे कार्य वापस करते हैं। – spender

+0

मुझे यकीन नहीं है कि मैं आपका अनुसरण करता हूं। मैं इस तरह की क्रिया के लिए एसिंक विधि असाइन कर सकता हूं: एक्शन टेस्ट = async() => DoSomethingAsync() का इंतजार है; यह सिर्फ ठीक है और "परीक्षण" कार्रवाई का इंतजार नहीं है। कॉलिंग टेस्ट() तुरंत लौटाता है और DoSomethingAsync() पूरा होने से पहले। Observer.Create() के लिए विधि परिभाषा क्रियाओं को आगे, ऑनर, और पूर्ण पर पैरा के रूप में स्वीकार करती है। यही कारण है कि मैंने ऊपर पोस्ट किया गया कोड काम करता है। हालांकि, जब कोई मेरे IObserver पर OnNext (x) को कॉल करता है, तो यह लौटने से पहले विधि के एसिंक्रोनस भाग को समाप्त करने की प्रतीक्षा नहीं करता है। –

+0

ओह। मेरी गलती। अब इसमें देख रहे हैं। – spender

उत्तर

12

सदस्यता लें async विधियों को नहीं लेता है। तो यहां क्या होता है कि आप async void से अग्नि-और-भूल तंत्र का उपयोग कर रहे हैं। समस्या यह है कि आगे के संदेशों को क्रमबद्ध नहीं किया जाएगा।

सदस्यता के अंदर async विधि को कॉल करने के बजाय, आपको इसे पाइपलाइन में लपेटना चाहिए ताकि आरएक्स इसके लिए प्रतीक्षा कर सके।

onError और onCompleted पर आग और भूलने का उपयोग करना ठीक है क्योंकि इन्हें Observable से बुलाया जाने वाला अंतिम चीज़ माना जाता है। ध्यान रखें कि Observable से जुड़े संसाधनों को onError और onCompleted के पूरा होने से पहले वापस कर दिया जा सकता है।

मैं एक छोटे से विस्तार विधि कि यह सब करता है लिखा है:

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted) 
    { 
     return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat() 
      .Subscribe(
      e => { }, // empty 
      onError, 
      onCompleted); 
    } 

पहले हम एक Observable में async onNext विधि परिवर्तित, दो async दुनिया ब्रिजिंग। इसका मतलब है onNext के अंदर async/प्रतीक्षा करें सम्मानित किया जाएगा। इसके बाद हम विधि को Defer में लपेटें ताकि यह बनाया जाने पर सही प्रारंभ न हो। इसके बजाय Concat अगले डिलीवर किए जाने पर केवल अगले डिलीवर किए जाने योग्य कॉल को कॉल करेगा।

Ps। मुझे उम्मीद है कि आरएक्सनेट की अगली रिलीज में async सब्सक्राइब में समर्थन होगा।

+0

"अगला संदेश अब धारावाहिक नहीं होंगे" का क्या मतलब है? –

+1

डोरस क्या कह रहा है कि एसिंक्रोनि की दो परतें यहां पेश की जा रही हैं। अगर एक संदेश प्राप्त होता है, तो यह 'WriteMessageAsync' विधि' का आह्वान करेगा। यदि कोई अन्य संदेश प्राप्त होता है, तो यह पिछले कॉल पूरा होने के बावजूद 'WriteMessageAsync' निष्पादित करेगा। इसका मतलब है कि आप ऑर्डर संदेशों से बाहर हो सकते हैं (विशेष रूप से यदि टास्कपूल शेड्यूलिंग भी शामिल हो)। यानी अगला संदेश अब क्रमबद्ध नहीं होंगे। –

+0

@ डोरस, प्रतिक्रिया के लिए धन्यवाद। क्या आप अपनी पोस्ट को संपादित करने के बारे में बताएंगे कि आपकी विस्तार विधि थोड़ा और विस्तार से क्या करती है? एक बात जो मैं सोच रहा हूं वह यह है कि क्या यह वास्तव में एसिंक/प्रतीक्षा का लाभ उठाएगा या ओननेक्स्ट को पूरा करने के लिए प्रतीक्षा करते समय यह थ्रेड को अवरुद्ध करेगा? साथ ही, मैंने देखा कि Rx.NET में NuGet पर v2.3 के बीटा संस्करण हैं, लेकिन मुझे उनके लिए कोई रिलीज नोट नहीं दिख रहा है। क्या आपको पता है कि वे कहां हो सकते हैं? –

संबंधित मुद्दे