2016-06-21 10 views
9

मैंने क्लोजर में कुछ कोर.एसिंक कोड लिखा और जब मैंने इसे चलाया तो यह सभी उपलब्ध स्मृति का उपभोग करता था और एक त्रुटि के साथ विफल रहा। ऐसा लगता है कि कोर.एसिंक पाइपलाइन में mapcat का उपयोग करके वापस दबाव टूट जाता है। (जो इस सवाल के दायरे से परे कारणों के लिए दुर्भाग्य की बात है।)मैपकैट core.async में बैकप्रेसर तोड़ने पर मेमोरी रिसाव कहां है?

यहाँ कुछ कोड है कि अंदर और बाहर :x रों की गणना के द्वारा समस्या को दर्शाता है है एक mapcat ing ट्रांसड्यूसर की:

(ns mapcat.core 
    (:require [clojure.core.async :as async])) 

(defn test-backpressure [n length] 
    (let [message (repeat length :x) 
     input (async/chan) 
     transform (async/chan 1 (mapcat seq)) 
     output (async/chan) 
     sent (atom 0)] 
    (async/pipe input transform) 
    (async/pipe transform output) 
    (async/go 
     (dotimes [_ n] 
     (async/>! input message) 
     (swap! sent inc)) 
     (async/close! input)) 
    (async/go-loop [x 0] 
     (when (= 0 (mod x (/ (* n length) 10))) 
     (println "in:" (* @sent length) "out:" x)) 
     (when-let [_ (async/<! output)] 
     (recur (inc x)))))) 

=> (test-backpressure 1000 10) 
in: 10 out: 0 
in: 2680 out: 1000 
in: 7410 out: 2000 
in: 10000 out: 3000 ; Where are the other 7000 characters? 
in: 10000 out: 4000 
in: 10000 out: 5000 
in: 10000 out: 6000 
in: 10000 out: 7000 
in: 10000 out: 8000 
in: 10000 out: 9000 
in: 10000 out: 10000 

निर्माता दौड़ दूर उपभोक्ता से आगे

ऐसा प्रतीत होता है कि मैं इसे खोजने वाला पहला व्यक्ति नहीं हूं। लेकिन here दिए गए स्पष्टीकरण को इसमें शामिल नहीं किया गया है। (हालांकि यह पर्याप्त कामकाज प्रदान करता है।) संकल्पनात्मक रूप से, मैं उम्मीद करता हूं कि निर्माता आगे बढ़ेगा, लेकिन केवल कुछ संदेशों की लंबाई से जो चैनलों में बफर किए जा सकते हैं।

मेरा सवाल है, अन्य सभी संदेश कहां हैं? आउटपुट की चौथी लाइन 7000 :x एस के लिए अनचाहे हैं।

+0

आपके द्वारा दिए गए लिंक में, एलेक्स ने उल्लेख किया कि यह गलत परिणाम और बफर-सीमा-उल्लंघन के बीच एक दुविधा है। स्पष्ट रूप से [ASYNC-124] (http://dev.clojure.org/jira/browse/ASYNC-124) एक सही उत्तर – Davyzhu

+0

पसंद करता है तो, आपके प्रश्न के संबंध में, अन्य संदेश संदर्भित 'लेकर्स' में होल्डिंग हो सकते हैं [यहां ] (https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L86)। इसके बारे में इतना यकीन नहीं है तो आइए अधिक आत्मविश्वास के उत्तर की प्रतीक्षा करें। – Davyzhu

उत्तर

2

प्रश्न की दो संभावित व्याख्याएं हैं "स्मृति रिसाव कहां है?"

सबसे पहले, डेटा कहां रखा जाता है? यह जवाब चैनल बफर में विस्तारित परिवर्तन के तुरंत नीचे की ओर लगता है।

डिफ़ॉल्ट रूप से चैनल FixedBuffer (clojure.core.async.impl.buffers/FixedBuffer) का उपयोग करते हैं जो बता सकते हैं कि यह पूर्ण है लेकिन यह अधिक होने के लिए ऑब्जेक्ट नहीं करता है।

दूसरा, कोड का कौन सा मार्ग बफर को ओवरफ्लू का कारण बनता है? यह (अगर मैं गलत हूं तो मुझे सही करें) ManyToManyChannel (clojure.core.async.impl.channels/ManyToManyChannel) के the take! method में प्रतीत होता है जहां बफर पर first call to add! किसी भी calls to full? से पहले होता है।

ऐसा लगता है कि take! मानता है कि यह प्रत्येक आइटम को हटाने के लिए बफर में कम से कम एक आइटम जोड़ सकता है। लंबे समय तक चलने वाले ट्रांसड्यूसर जैसे mapcat के मामले में यह हमेशा एक सुरक्षित धारणा नहीं है।

core.async की स्थानीय प्रतिलिपि में this line(when (and (.hasNext iter) (not (impl/full? buf))) को बदलकर मैं प्रश्न में कोड को अपेक्षित व्यवहार कर सकता हूं। (नायब core.async की मेरी समझ मुझे गारंटी करने के लिए के लिए है कि इस अपने उपयोग के मामले के लिए एक मजबूत समाधान है अपर्याप्त है।)

अद्यतन 2016/09/17: http://dev.clojure.org/jira/browse/ASYNC-178

: वहाँ अब इस के लिए एक मुद्दा है
संबंधित मुद्दे