2014-10-28 6 views
13

मेरे पास एक ऐसा एप्लिकेशन है जो दो घटकों के बीच संदेश भेजने/प्राप्त करने के लिए संदेश कतार के रूप में RabbitMQ का उपयोग करता है: प्रेषक और रिसीवर। प्रेषक संदेश को बहुत तेज़ तरीके से भेजता है। रिसीवर संदेश प्राप्त करता है और फिर कुछ बहुत समय लेने वाला कार्य करता है (मुख्य रूप से बहुत बड़े डेटा आकार के लिए डेटाबेस लेखन)। चूंकि रिसीवर को कार्य को पूरा करने में बहुत लंबा समय लगता है और फिर कतार में अगला संदेश पुनर्प्राप्त करता है, प्रेषक कतार को जल्दी भरना जारी रखेगा। तो मेरा सवाल यह है: क्या इससे संदेश कतार बहती है?खरगोश एमक्यू: तेजी से निर्माता और धीमी उपभोक्ता

संदेश उपभोक्ता ऐसा दिखाई देता है:

public void onMessage() throws IOException, InterruptedException { 
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue(); 
    channel.queueBind(queueName, EXCHANGE_NAME, ""); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(queueName, true, consumer); 

    while (true) { 
     QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
     String message = new String(delivery.getBody()); 
     System.out.println(" [x] Received '" + message + "'"); 

     JSONObject json = new JSONObject(message); 
     String caseID = json.getString("caseID"); 
     //following takes very long time    
     dao.saveToDB(caseID); 
    } 
} 

उपभोक्ता द्वारा प्राप्त प्रत्येक संदेश एक caseID शामिल हैं। प्रत्येक केस आईडी के लिए, यह डेटाबेस में बड़ी मात्रा में डेटा बचाएगा, जो बहुत लंबा समय लेता है। वर्तमान में केवल एक उपभोक्ता रब्बीएमक्यू के लिए स्थापित किया गया है क्योंकि उत्पादक/उपभोक्ता केस आईडी के प्रकाशित/सदस्यता के लिए एक ही कतार का उपयोग करता है। तो मैं उपभोक्ता थ्रूपुट को कैसे बढ़ा सकता हूं ताकि उपभोक्ता निर्माता के साथ पकड़ सके और कतार में संदेश ओवरफ्लो से बच सके? क्या मुझे खपत दर में तेजी लाने के लिए उपभोक्ता हिस्से में मल्टीथ्रेडिंग का उपयोग करना चाहिए? या मुझे आने वाले संदेश को एक साथ उपभोग करने के लिए एकाधिक उपभोक्ताओं का उपयोग करना चाहिए? या उपभोक्ता को संदेश को असीमित रूप से पूरा करने के इंतजार किए बिना उपभोग करने का कोई असीमित तरीका है? किसी भी सुझाव का स्वागत है।

उत्तर

1

"तो मैं उपभोक्ता थ्रुपुट को कैसे बढ़ा सकता हूं ताकि उपभोक्ता निर्माता के साथ पकड़ सके और कतार में संदेश ओवरफ्लो से बच सके?" यह जवाब है "आने वाले संदेश को एक साथ उपभोग करने के लिए कई उपभोक्ताओं का उपयोग करें", इन उपभोक्ताओं को सिद्धांत साझा करने के समानांतर में चलाने के लिए बहु-थ्रेडिंग का उपयोग करें, http://www.eaipatterns.com/CompetingConsumers.html

+0

[RabbitMQ के दस्तावेज़ीकरण] से (http://www.rabbitmq.com/tutorials/tutorial-three-python.html), यहां दो दृष्टिकोण हैं: कार्यकर्ता कतार, प्रकाशित/सदस्यता लें। मैं अभी पब/उप मॉडल का उपयोग कर रहा हूँ। क्या मुझे कई उपभोक्ताओं के लिए कार्यकर्ता कतार का उपयोग करना चाहिए? – tonga

+0

आपको जो चाहिए वह इसके लिए कार्यकर्ता कतार होना चाहिए। इस प्रकार इसे कार्यान्वित किया जा सकता है https://github.com/victorpictor/Hotel/blob/master/Infrastructure/MessageTransport/Receivers/Subscriber.cs#L29 – voutrin

+0

लेकिन यदि मैं विभिन्न उद्देश्यों के लिए कई कतारों का उपयोग करना चाहता हूं तो क्या होगा? अभी केससीड संदेश के लिए केवल एक कतार है। केसआईडी के अलावा अन्य डेटा हो सकता है। तो मुझे एकाधिक कतारों के लिए प्रकाशित/सब्सक्राइब मॉडल का उपयोग करने की आवश्यकता हो सकती है। – tonga

0

उत्तर के रूप में मैं सुझाव देता हूं: दोनों।

आप एकाधिक रिसीवर रखने के साथ-साथ प्रत्येक रिसीवर को एक अलग थ्रेड में निष्पादित करने के लिए लाभ उठा सकते हैं, इस प्रकार रिसीवर को अगले संदेश को कतार में स्वीकार करने की अनुमति मिलती है।

बेशक यह दृष्टिकोण मानता है कि प्रत्येक ऑपरेशन का परिणाम (डीबी पर लेखन, अगर मैं सही ढंग से समझता हूं) किसी अन्य तरीके से अन्य संदेशों से प्रतिक्रिया में आने वाले परिचालनों के परिणाम को प्रभावित नहीं करता है।

1

आपके प्रदर्शन को बढ़ाने के आपके कई तरीके हैं।

  1. आप अधिक उत्पादकों के साथ एक कार्यकर्ता कतार बना सकते हैं, इस तरह आप एक साधारण लोड-बैलेंस सिस्टम बनाते हैं। एक्सचेंज ---> कतार का उपयोग न करें, लेकिन केवल कतार। इस पोस्ट को पढ़ें RabbitMQ Non-Round Robin Dispatching

  2. जब आपको कोई संदेश मिलता है तो आप अपने डेटाबेस पर डेटा डालने के लिए पूलथ्रेड बना सकते हैं, लेकिन इस मामले में आपको विफलता का प्रबंधन करना होगा।

लेकिन मुझे लगता है कि मुख्य समस्या डेटाबेस है और खरगोश एमक्यू नहीं है। एक अच्छी ट्यूनिंग, बहु-थ्रेडिंग और कार्यकर्ता कतार के साथ आप एक स्केलेबल और तेज़ समाधान प्राप्त कर सकते हैं।

मुझे पता

14

दें "इस कारण विल संदेश कतार से ऊपर जाने का?"

हां। कतार लंबाई बढ़ने के साथ अत्यधिक स्मृति खपत को रोकने के लिए RabbitMQ "प्रवाह नियंत्रण" की स्थिति में प्रवेश करेगा।यह स्मृति में उन्हें पकड़ने के बजाए डिस्क पर लगातार संदेशों को भी शुरू करेगा।

"तो मैं कैसे उपभोक्ता प्रवाह में तेजी लाने के कर सकते हैं ताकि उपभोक्ता निर्माता के साथ पकड़ने और कतार में संदेश अतिप्रवाह से बच सकते हैं"

आप 2 विकल्प हैं:

  1. अधिक उपभोक्ता जोड़ें। ध्यान रखें कि यदि आप इस विकल्प को चुनते हैं तो आपके डीबी को अब कई समवर्ती प्रक्रियाओं में छेड़छाड़ की जाएगी। सुनिश्चित करें कि डीबी अतिरिक्त दबाव का सामना कर सकता है।
  2. क्यूओएस उपभोग करने वाले चैनल का मूल्य बढ़ाएं। यह कतार से अधिक संदेश खींच देगा और उन्हें उपभोक्ता पर बफर करेगा। यह समग्र प्रसंस्करण समय में वृद्धि करेगा; अगर 5 संदेश buffered हैं, तो 5 वां संदेश पूरा करने के लिए संदेश 1 ... 5 के प्रसंस्करण समय ले जाएगा।

"मैं उपभोक्ता भाग में बहु सूत्रण का उपयोग करना चाहिए खपत दर में तेजी लाने के लिए?"

जब तक कि आपके पास एक अच्छी तरह से डिज़ाइन किया गया समाधान न हो। किसी एप्लिकेशन में समांतरता जोड़ने से उपभोक्ता-पक्ष पर बहुत अधिक ओवरहेड जोड़ना होगा। आप ThreadPool थ्रॉटलिंग या थ्रॉटलिंग मेमोरी-उपयोग को समाप्त कर सकते हैं।

एएमक्यूपी से निपटने पर, आपको इष्टतम समाधान को डिजाइन करने के लिए प्रत्येक प्रक्रिया के लिए वास्तव में व्यवसाय आवश्यकता पर विचार करने की आवश्यकता है। आपके आने वाले संदेश कितने समय-संवेदनशील हैं? क्या उन्हें डीबी ASAP पर बने रहने की आवश्यकता है, या इससे आपके उपयोगकर्ताओं के लिए कोई फर्क पड़ता है कि डेटा तुरंत उपलब्ध है या नहीं?

यदि डेटा को तुरंत जारी रखने की आवश्यकता नहीं है, तो आप अपने आवेदन को संशोधित कर सकते हैं ताकि उपभोक्ता कतार से संदेश हटा सकें और उन्हें रेडिस में कैश किए गए संग्रह में सहेज सकें। एक दूसरी प्रक्रिया का परिचय दें जो अनुक्रमिक रूप से कैश किए गए संदेशों को पढ़ और संसाधित करता है। यह सुनिश्चित करेगा कि आपकी कतार-लंबाई पर्याप्त रूप से प्रवाह-नियंत्रण में न हो, जबकि आपके डीबी को लिखने के अनुरोधों के साथ बमबारी से रोका जा सके, जो आम तौर पर पढ़ने के अनुरोधों से अधिक महंगा होते हैं। आपके उपभोक्ता अब कतार से संदेश हटाते हैं, जिसे बाद में दूसरी प्रक्रिया के साथ निपटाया जा सकता है।

+0

धन्यवाद पॉल। यह वास्तव में एक अच्छा सुझाव है। मेरे डेटा को तत्काल डीबी में जारी रखने की आवश्यकता नहीं है। डीबी लगातार भाग में बहुत लंबा समय लगता है क्योंकि इसमें प्रत्येक मामले के लिए डेटा पार्सिंग शामिल है और फिर एक डीबी डालने में बड़ी मात्रा में डेटा (~ 10000 पंक्तियां) को सहेजना शामिल है। तो रेडिस का उपयोग करना एक अच्छा विचार है क्योंकि यह एक मेमोरी कैश है। लेकिन अंत में मुझे अभी भी डीबी को डेटा जारी रखने की जरूरत है। तो संदेश उपभोक्ता संदेश लेता है और Redis को सहेजने के बाद मैं डीबी लिखने के कार्य को पूरा करने के लिए रेडिस का उपयोग कैसे कर सकता हूं? यदि डीबी सम्मिलन बहुत धीमा है, तो क्या उपभोक्ता रेडिस कैश आकार सीमा को ओवरफ्लो करेगा? – tonga

+0

मैं प्रत्येक संदेश को एकल, या एकाधिक प्रक्रियाओं से उपभोग करता हूं, डीडी में आने के बाद रेडिस से संदेश को शुद्ध करता हूं। रेडिस में कोई कैश सीमा नहीं है - आप मेजबान मशीन पर रैम की मात्रा से सीमित हैं। 1,000,000 अपेक्षाकृत छोटी चाबियाँ लगभग 200 एमबी है। यदि आप स्मृति से बाहर निकलने के बारे में चिंतित हैं, तो इसे देखें: http://redis.io/topics/memory-optimization –

+0

मैंने एक पोस्ट जोड़ा है, एएमक्यूपी को स्केल करने के दृष्टिकोणों को रेखांकित किया है, और संबंधित पुरस्कार और कमियां : http://insidethecpu.com/2014/11/11/rabbitmq-qos-vs-competing-consumers/ –

1

हालांकि यह सच है कि अधिक उपभोक्ताओं को जोड़ने से वास्तविक समस्याएं हो सकती हैं, असली समस्या डेटाबेस को सहेजी जाएगी।

यहां पहले से ही कई उत्तर हैं जो उपभोक्ताओं (धागे, और मशीनों) को जोड़ने और क्यूओएस को बदलने के बारे में बात करते हैं, इसलिए मैं इसे दोहराने वाला नहीं हूं। इसके बजाय आपको संदेशों के समूह में संदेशों को एकत्रित करने के लिए Aggregator पैटर्न का उपयोग करने पर गंभीरता से विचार करना चाहिए और फिर समूह को एक शॉट में अपने डेटाबेस में डालें।

प्रत्येक संदेश के लिए आपका वर्तमान कोड शायद एक कनेक्शन खोलता है, डेटा डालता है, और उस कनेक्शन को बंद करता है (या पूल पर वापस)। इससे भी बदतर यह लेनदेन का उपयोग भी कर सकता है।

एग्रीगेटर पैटर्न का उपयोग करके आप अनिवार्य रूप से फ्लश करने से पहले डेटा को बफर कर रहे हैं।

अब एक अच्छा एग्रीगेटर लिखना मुश्किल है। आपको यह तय करने की आवश्यकता होगी कि आप कैसे बफर करना चाहते हैं (यानी प्रत्येक कार्यकर्ता का अपना बफर या रेडिस जैसे केंद्रीय बफर होता है)। स्प्रिंग एकीकरण में एक एग्रीगेटर है जो मेरा मानना ​​है।

संबंधित मुद्दे