2012-10-04 20 views
5

मेरे पास एक छोटा क्लोजर उपभोक्ता/प्रकाशक संदेश प्राप्त कर रहा है, उन्हें प्रोसेस कर रहा है और उन्हें अन्य उपभोक्ताओं को भेज रहा है, सब कुछ RabbitMQ के माध्यम से।क्लोजर संदेश हैंडलिंग/एसिंक, मल्टीथ्रेडेड

मैंने एक संदेश-हैंडलर परिभाषित किया है जो संदेशों को एक अलग थ्रेड (मुख्य धागे से अलग) में संभालता है। जैसा कि नीचे दिए गए कोड में देखा जा सकता है, थ्रेड सिंक्रनाइज़ रूप से संदेशों को प्राप्त करता है और भेजता है, एलसीएम/सब्सक्राइब फ़ंक्शन द्वारा शुरू किए गए ईवेंट लूप में हो रहा है।

तो सवाल यह है कि इन सिंक्रोनस संदेश-हैंडलर के एन-आकार वाले थ्रेड पूल बनाने के लिए "क्लोजर मार्ग" क्या होगा? मुझे लगता है कि गैर-क्लोजर तरीका जावा इंटरऑप के माध्यम से कई धागे मैन्युअल रूप से उत्पन्न करना होगा।

इसके अलावा, क्या यह संदेश प्रोसेसिंग चीजों को गति देगा, क्योंकि यह प्रसंस्करण बहुत सीपीयू गहन नहीं है? क्या इन संदेश-हैंडलर एसिंक को बेहतर बनाना बेहतर होगा - फिर, इस बात पर विचार करते हुए कि प्रसंस्करण की तुलना में अधिक समय व्यतीत किया जा रहा है?

और आखिरकार, मैं इन आकस्मिक दृष्टिकोणों के प्रदर्शन को मापने के बारे में कैसे जाऊं (मैं रूबी/जावास्क्रिप्ट दुनिया से आया हूं, और वहां कोई बहुमत नहीं है)?

नोट: मैं जानता हूँ कि यह सब सिर्फ क्षैतिज स्केलिंग और संदेश बस को सुन अधिक JVM प्रक्रियाओं को उत्पन्न करने से बचा जा सकता है, लेकिन जब से एप्लिकेशन Heroku पर तैनात किया जा रहा है, मैं के रूप में उपयोग करना चाहते हैं प्रत्येक dyno/प्रक्रिया में जितना संभव हो उतना संसाधन।

(defn message-handler 
    [ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message))) 

(defn -main 
    [& args] 
    (let [conn   (rmq/connect {:uri (System/getenv "MSGQ")}) 
     ch   (lch/open conn) 
     q-name  "q.events.tagger" 
     e-sub-name "e.events.preproc" 
     e-pub-name "e.events" 
     routing-key "tasks.taggify"] 
    (lq/declare ch q-name :exclusive false :auto-delete false) 
    (le/declare ch e-pub-name "fanout" :durable false) 
    (lq/bind ch q-name e-sub-name :routing-key routing-key) 
    (.start (Thread. (fn [] 
         (lcm/subscribe ch q-name message-handler :auto-ack true)))))) 

एक और अधिक बुनियादी टिप्पणी पर ... कैसे मैं इस कोड पुनर्रचना एक अतिरिक्त तर्क के साथ संदेश-हैंडलर कॉलबैक पंजीकरण के समर्थन करने के लिए, इस तरह के बारे में जाना होगा:

(.start (Thread. (fn [] 
         (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true)))))) 

और फिर एक संदर्भ के साथ प्रकाशित:

(lb/publish ch pub-name "" processed-message))) 
एक शाब्दिक के बजाय

:

(lb/publish ch "e.events" "" processed-message))) 

उत्तर

2

प्रश्न के दूसरे भाग के लिए, आप आंशिक आवेदन के रूप में नीचे दिखाया गया है का उपयोग कर सकते हैं:

(defn message-handler 
    [pub-name ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch pub-name "" processed-message))) 



(.start 
    (Thread. 
    (fn [] 
     (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true)))))) 
1

यह एक बहुत बड़ा विषय है, और आप इस प्रश्न को कई अलग-अलग प्रश्नों में तोड़ने पर विचार कर सकते हैं, लेकिन संक्षिप्त उत्तर यह है: use agents

+0

टिप के लिए धन्यवाद, करेंगे। – neektza

संबंधित मुद्दे