2013-06-01 3 views
8

मैं इस नोट की लंबाई के लिए पहले से माफी माँगता हूं। मैंने इसे कम करने में काफी समय बिताया, और यह उतना ही छोटा था जितना मैं इसे प्राप्त कर सकता था।rxjava और clojure asynchrony रहस्य: वायदा वादे और एजेंट, ओह मेरे

मेरे पास एक रहस्य है और आपकी मदद के लिए आभारी होंगे। यह रहस्य एक आरएक्सजेवा observer के व्यवहार से आता है, मैंने क्लोजर में कुछ सरल observable एस ऑनलाइन नमूने से लिखे गए हैं।

एक अवलोकन योग्य सिंक्रनाइज़ रूप से onNext संदेशों को अपने पर्यवेक्षकों के हैंडलर भेजता है, और मेरा माना जाता है कि प्रिंसिपल पर्यवेक्षक अपेक्षित व्यवहार करता है।

क्लोजर future के माध्यम से अन्य अवलोकन असंकालिक रूप से एक और थ्रेड पर समान होता है। सटीक वही पर्यवेक्षक अपने सभी onNext पर पोस्ट की गई सभी घटनाओं को कैप्चर नहीं करता है; ऐसा लगता है कि पूंछ पर यादृच्छिक संख्या में संदेश खोना प्रतीत होता है।

वहाँ के लिए promiseonCompleted प्रतीक्षा की समाप्ति और एक agent कलेक्टर को भेजे गए सभी घटनाओं के लिए एक प्रतीक्षा की समाप्ति के बीच निम्न में एक जानबूझकर दौड़ है। यदि promise जीतता है, तो मुझे falseonCompleted और agent में संभावित रूप से छोटी कतार देखने की उम्मीद है। यदि agent जीत, मैं agent की कतार से onCompleted के लिए true और सभी संदेशों को देखने की उम्मीद। एक परिणाम जो मुझे उम्मीद नहीं है trueonCompleted और agent से एक छोटी कतार है। लेकिन, मर्फी सो नहीं है, और यह वही है जो मैं देखता हूं। मुझे नहीं पता कि कचरा-संग्रह गलती है या क्लोजर के एसटीएम, या मेरी मूर्खता, या कुछ और पूरी तरह से कुछ आंतरिक कतार में है या नहीं।

मैं, अपने आत्म निहित प्रपत्र, यहाँ के क्रम में स्रोत पेश इतना है कि यह lein repl के माध्यम से सीधे चलाया जा सकता है। पहले leiningen प्रोजेक्ट फ़ाइल, project.clj, जो नेटफ्लिक्स के rxjava की 0.9.0 संस्करण पर निर्भरता वाणी:

(defproject expt2 "0.1.0-SNAPSHOT" 
    :description "FIXME: write description" 
    :url "http://example.com/FIXME" 
    :license {:name "Eclipse Public License" 
      :url "http://www.eclipse.org/legal/epl-v10.html"} 
    :dependencies [[org.clojure/clojure    "1.5.1"] 
       [com.netflix.rxjava/rxjava-clojure "0.9.0"]] 
    :main expt2.core) 

अब, नाम स्थान और एक Clojure आवश्यकता और जावा आयात वहाँ तीन cermonials रास्ते से हट जाओ करने के लिए कर रहे हैं :

(ns expt2.core 
    (:require clojure.pprint) 
    (:refer-clojure :exclude [distinct]) 
    (:import [rx Observable subscriptions.Subscriptions])) 

अंत में, कंसोल के लिए उत्पादन के लिए एक मैक्रो:

(defmacro pdump [x] 
    `(let [x# ~x] 
    (do (println "----------------") 
     (clojure.pprint/pprint '~x) 
     (println "~~>") 
     (clojure.pprint/pprint x#) 
     (println "----------------") 
     x#))) 

अंत में, मेरी पर्यवेक्षक के। मैं एक agent का उपयोग किसी भी नमूदार के onNext द्वारा भेजे गए संदेशों को इकट्ठा करने के लिए। मैं संभावित onError एकत्र करने के लिए atom का उपयोग करता हूं। मैं onCompleted के लिए एक promise उपयोग करती हैं इसलिए है कि उपभोक्ताओं को पर्यवेक्षक के बाहरी उस पर इंतजार कर सकते हैं।

(defn- subscribe-collectors [obl] 
    (let [;; Keep a sequence of all values sent: 
     onNextCollector  (agent []) 
     ;; Only need one value if the observable errors out: 
     onErrorCollector  (atom nil) 
     ;; Use a promise for 'completed' so we can wait for it on 
     ;; another thread: 
     onCompletedCollector (promise)] 
    (letfn [;; When observable sends a value, relay it to our agent" 
      (collect-next  [item] (send onNextCollector (fn [state] (conj state item)))) 
      ;; If observable errors out, just set our exception; 
      (collect-error  [excp] (reset! onErrorCollector  excp)) 
      ;; When observable completes, deliver on the promise: 
      (collect-completed [ ] (deliver onCompletedCollector true)) 
      ;; In all cases, report out the back end with this: 
      (report-collectors [ ] 
       (pdump 
       ;; Wait for everything that has been sent to the agent 
       ;; to drain (presumably internal message queues): 
       {:onNext  (do (await-for 1000 onNextCollector) 
           ;; Then produce the results: 
           @onNextCollector) 
       ;; If we ever saw an error, here it is: 
       :onError  @onErrorCollector 
       ;; Wait at most 1 second for the promise to complete; 
       ;; if it does not complete, then produce 'false'. 
       ;; I expect if this times out before the agent 
       ;; times out to see an 'onCompleted' of 'false'. 
       :onCompleted (deref onCompletedCollector 1000 false) 
       }))] 
     ;; Recognize that the observable 'obl' may run on another thread: 
     (-> obl 
      (.subscribe collect-next collect-error collect-completed)) 
     ;; Therefore, produce results that wait, with timeouts, on both 
     ;; the completion event and on the draining of the (presumed) 
     ;; message queue to the agent. 
     (report-collectors)))) 

अब, यहां एक तुल्यकालिक देखने योग्य है। यह अपने पर्यवेक्षकों के onNext गले के नीचे 25 संदेश पंप, तो कॉल उनके onCompleted रों।

(defn- customObservableBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method. 
     ;; Send 25 strings to the observer's onNext: 
     (doseq [x (range 25)] 
     (-> observer (.onNext (str "SynchedValue_" x)))) 
     ; After sending all values, complete the sequence: 
     (-> observer .onCompleted) 
     ; return a NoOpSubsription since this blocks and thus 
     ; can't be unsubscribed (disposed): 
     (Subscriptions/empty)))) 

हम इस नमूदार के लिए हमारी पर्यवेक्षक सदस्यता ली है:

;;; The value of the following is the list of all 25 events: 
(-> (customObservableBlocking) 
    (subscribe-collectors)) 

यह रूप में की उम्मीद काम करता है, और हम सांत्वना

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["SynchedValue_0" 
    "SynchedValue_1" 
    "SynchedValue_2" 
    "SynchedValue_3" 
    "SynchedValue_4" 
    "SynchedValue_5" 
    "SynchedValue_6" 
    "SynchedValue_7" 
    "SynchedValue_8" 
    "SynchedValue_9" 
    "SynchedValue_10" 
    "SynchedValue_11" 
    "SynchedValue_12" 
    "SynchedValue_13" 
    "SynchedValue_14" 
    "SynchedValue_15" 
    "SynchedValue_16" 
    "SynchedValue_17" 
    "SynchedValue_18" 
    "SynchedValue_19" 
    "SynchedValue_20" 
    "SynchedValue_21" 
    "SynchedValue_22" 
    "SynchedValue_23" 
    "SynchedValue_24"], 
:onError nil, 
:onCompleted true} 
---------------- 

यहाँ पर निम्नलिखित परिणाम को देखने के एक अतुल्यकालिक नमूदार करता है बिल्कुल वही बात, केवल future के थ्रेड पर:

(defn- customObservableNonBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method 
     (let [f (future 
       ;; On another thread, send 25 strings: 
       (doseq [x (range 25)] 
        (-> observer (.onNext (str "AsynchValue_" x)))) 
       ; After sending all values, complete the sequence: 
       (-> observer .onCompleted))] 
     ; Return a disposable (unsubscribe) that cancels the future: 
     (Subscriptions/create #(future-cancel f)))))) 

;;; For unknown reasons, the following does not produce all 25 events: 
(-> (customObservableNonBlocking) 
    (subscribe-collectors)) 

लेकिन, आश्चर्य है, हम कंसोल पर देखते हैं: trueonCompleted के लिए, जिसका अर्थ यह है कि promise समय-समय पर नहीं था; लेकिन केवल कुछ asynch संदेश। हमारे द्वारा देखे जाने वाले संदेशों की वास्तविक संख्या रन से चलने के लिए भिन्न होती है, जिसका मतलब यह है कि खेल में कुछ समरूपता घटना होती है। सुराग की सराहना की।

---------------- 
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["AsynchValue_0" 
    "AsynchValue_1" 
    "AsynchValue_2" 
    "AsynchValue_3" 
    "AsynchValue_4" 
    "AsynchValue_5" 
    "AsynchValue_6"], 
:onError nil, 
:onCompleted true} 
---------------- 

उत्तर

7

await-for एजेंट पर इसका मतलब ब्लाकों वर्तमान धागा जब तक सभी कार्यों इस प्रकार भेजा दूर एजेंटों हुई है, जिसका अर्थ है कि यह है कि आपके इंतजार के बाद हो सकता है कि (इस सूत्र या एजेंट से) अभी भी कुछ अन्य धागा है जो एजेंट को संदेश भेज सकता है और यही आपके मामले में हो रहा है। एजेंट पर आपकी प्रतीक्षा खत्म हो जाने के बाद और आपने मानचित्र में :onNext कुंजी में अपना मूल्य खराब कर दिया है, तो आप प्रतीक्षा के बाद पूरा होने वाले पूर्ण वादे का इंतजार कर रहे हैं, लेकिन उस समय के दौरान कुछ अन्य संदेश भेजे गए थे एजेंट वेक्टर में एकत्र किया जाना है।

आप :onCompleted कुंजी को मानचित्र में पहली कुंजी के रूप में हल कर सकते हैं जिसका मूल रूप से पूरा होने का इंतजार है और उसके बाद एजेंटों के लिए इंतजार करना उस समय तक send कॉल के बाद हो सकता है पहले से ही प्राप्त हो चुका है।

{:onCompleted (deref onCompletedCollector 1000 false) 
:onNext  (do (await-for 0 onNextCollector) 
           @onNextCollector) 
:onError  @onErrorCollector 
} 
+0

सत्यापित और परीक्षण किया गया। –

संबंधित मुद्दे