2011-03-08 19 views
8

हमारे पास एक जेएमएस कतार है जो बहुत बड़ी संख्या में संदेश प्राप्त करती है।प्रभावी जेएमएस प्रोसेसिंग

श्रोता को डेटाबेस लेनदेन का उपयोग करके डेटाबेस में संदेश सहेजना है और फिर जेएमएस लेनदेन करना है।

तो मैं इसे और अधिक प्रभावी ढंग से कैसे कर सकता हूं जहां मुझे डेटाबेस & प्रत्येक संदेश पर जेएमएस प्रतिबद्ध करने की आवश्यकता नहीं है।

उत्तर

6

एसिंक संदेश के पीछे आधार, विशेष रूप से जब एमडीबी का उपयोग करते हैं, यह है कि प्रत्येक संदेश परमाणु है। ऐसा कहने के लिए, किसी एक संदेश को संसाधित करने का नतीजा किसी अन्य संदेश को संसाधित करने के नतीजे से स्वतंत्र होना चाहिए। आपकी समस्या का आदर्श समाधान संदेशों की इस परमाणु को संरक्षित रखेगा।

यदि आप काम की एक ही इकाई में एकाधिक संदेशों को संसाधित करना चाहते हैं, तो आप इस परमाणु को खो देंगे। उदाहरण के लिए, मान लें कि आपने प्रत्येक 25 संदेशों में सिंकपॉइंट करने का निर्णय लिया है। यदि 25 वें संदेश में कोई त्रुटि हुई, जैसे कोड पेज रूपांतरण समस्या जिसने इसे कतार से पुनर्प्राप्त करने से रोका, तो संदेशों के पूरे बैच का समर्थन किया जाएगा। तब वे सभी को फिर से बनाया जाएगा। संदेशों के लिए पुनर्विक्रेता गिनती प्रत्येक पढ़ने/बैकआउट चक्र के साथ बढ़ेगी। एक बार जब आपके एप सर्वर में रीडेलवरी गिनती थ्रेसहोल्ड सेट से अधिक हो गई हो, तो आपके कॉन्फ़िगरेशन के आधार पर सभी 25 संदेशों को त्याग दिया जाएगा या अपेक्षित किया जाएगा। बैच जितना बड़ा होगा, अधिक संदेश किसी त्रुटि स्थिति में संभावित रूप से प्रभावित होंगे क्योंकि संपूर्ण बैच एक साथ रहता है या मर जाता है। एकल बैजन संदेश की स्थिति में अपने बैच आकार को 100 और 100 संदेशों पर जोखिम में डाल दिया जाएगा।

एक वैकल्पिक समाधान आपके एमडीबी में कई प्रसंस्करण धागे की अनुमति देना है। जेएमएस के साथ आप एक ही कनेक्शन के तहत कई सत्रों को जन्म दे सकते हैं। प्रत्येक सत्र कार्य की अपनी इकाई का प्रबंधन कर सकता है, इसलिए प्रत्येक सत्र स्वतंत्र रूप से एक्सए लेनदेन शुरू कर सकता है, एक संदेश प्राप्त कर सकता है, डेटाबेस अपडेट कर सकता है और फिर लेनदेन कर सकता है। यदि एक संदेश खराब है, तो केवल वह संदेश और डेटाबेस अपडेट प्रभावित होता है।

इसमें अपवाद हैं। उदाहरण के लिए, यदि एक बड़े बैच को प्रोसेस करना और संदेश सभी एक ही उत्पादक से उत्पन्न होते हैं, तो एमडीबी के अलावा कुछ संदेश लाने और काम की एक ही इकाई के तहत कई पंक्तियों को अपडेट करना आम बात है। इसी प्रकार, यदि संदेश अनुक्रम-निर्भर हैं तो समांतर प्रसंस्करण संभव नहीं है क्योंकि यह अनुक्रम को संरक्षित नहीं करेगा। लेकिन फिर, अनुक्रम निर्भर संदेश परमाणु नहीं हैं। फिर, इस मामले में एक एमडीबी आदर्श समाधान नहीं है।

आपके परिवहन प्रदाता के आधार पर, समर्थित धागे की संख्या केवल स्मृति भंडारण द्वारा ही सीमित हो सकती है। उदाहरण के लिए वेबस्फेयर एमक्यू एक कतार पर सैकड़ों एक साथ गेटर धागे को आसानी से संभाल सकता है। अपने ऐप सर्वर के एमडीबी कॉन्फ़िगरेशन के लिए ट्यूनिंग की जांच करें ताकि आप देख सकें कि आप कितने धागे स्पिन कर सकते हैं और फिर सत्यापित कर सकते हैं कि आपका परिवहन लोड को संभाल सकता है। फिर थ्रेड की इष्टतम संख्या को खोजने के लिए थोड़ा सा खेलते हैं। प्रदर्शन नाटकीय रूप से बढ़ेगा क्योंकि धागे एक से बढ़ते हैं, लेकिन केवल एक बिंदु तक। पिछली बात यह है कि आप आम तौर पर एक पठार देखते हैं, फिर थ्रेड मैनेजमेंट ओवरहेड प्रदर्शन लाभ को ऑफ़सेट करता है। जहां swe3et स्पॉट झूठ बोलता है इस पर निर्भर करता है कि मैसेजिंग ब्रोकर कितना भारी है और क्या यह सीपीयू, मेमोरी, डिस्क या नेटवर्क से सबसे अधिक बाधित है।

8

प्रत्येक संदेश पर ऐसा न करें, बैच में करें। जेएमएस आपके डीबी की तरह लेनदेन का समर्थन करता है; एक जेएमएस लेनदेन शुरू करें, एन संदेश पढ़ें। डीबी लेनदेन शुरू करें, एन संदेश डालें। जेएमएस को प्रतिबद्ध, डीबी को प्रतिबद्ध।

यह स्पष्ट रूप से होने वाली दौड़ के लिए एक खिड़की पेश करता है (दो कामों के बीच दुर्घटना होती है)। आपके पास अब यह है, लेकिन केवल एक संदेश के लिए। यदि आप उस समस्या को हल करना चाहते हैं तो आपको या तो एक्सए लेनदेन (दो चरणबद्ध प्रतिबद्धता) या कम से कम कुछ प्रकार की डुप्लिकेट पहचान योजना को देखकर सामना करना पड़ता है। इसके लिए कुछ परिचय के लिए, एक नज़र डालें: http://activemq.apache.org/should-i-use-xa.html

+0

कॉल बैक विधि "ऑन मैसेज" एक समय में केवल एक संदेश देता है, तो मैं एन संदेशों को कैसे प्राप्त कर सकता हूं। – changed

+3

मैं MessageListener इंटरफ़ेस का उपयोग नहीं करता और केवल संदेश प्राप्त करता हूं जब मुझे संदेश प्राप्त होता है। आप ऐसा कर सकते हैं (एक सदस्य चर के माध्यम से आपको कितने संदेश प्राप्त हुए हैं, लेनदेन शुरू करें और प्रतिबद्ध करें, आदि) का ट्रैक रखें, लेकिन आप अपनी रेस कंडीशन विंडो बढ़ा रहे हैं क्योंकि आप किसी भी कार्रवाई को ट्रिगर करने के लिए किसी संदेश पर निर्भर हैं। यह वास्तव में सबसे अच्छा तरीका नहीं है। आप पारंपरिक लूप करने से कहीं बेहतर हैं जहां आप कतार से संदेश पढ़ते हैं (टाइमआउट या गैर-ब्लॉक के साथ कॉल अवरुद्ध करते हैं) और जब आपके पास एन संदेश होते हैं या वाई समय बीत जाता है तो आपका काम करता है। –

+1

क्षमा करें - विशेष रूप से यह संदेश संदेशसमूह इंटरफ़ेस विधियों का उपयोग कर रहा है (टाइमआउट) प्राप्त करें और setMessageListener() के साथ अपना संदेशसूची पंजीकृत करने के बजाय NoWait() प्राप्त करें। –

0

यहां एक जेएम प्रोसेसर है जो एक कतार से संदेश लेगा, उन्हें एक सूची में जोड़ देगा, और दूसरी कतार में वापस धक्का देगा। आप समायोजित कर सकते हैं कि कैसे मान पढ़ सकते हैं और संबंधित तरीकों में एकत्रित कर रहे हैं:

public class JmsBatcher<T> { 
    final Session session; 
    private final MessageConsumer consumer; 
    private final MessageProducer producer; 
    private final int batchSize; 


    public JmsBatcher(final Connection connection, 
         final String sourceQueue, 
         final String destQueue, 
         final int batchSize) throws JMSException { 
     this.batchSize = batchSize; 
     session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
     final Queue source = session.createQueue(sourceQueue); 
     final Queue dest = session.createQueue(destQueue); 
     consumer = session.createConsumer(source); 
     producer = session.createProducer(dest); 
    } 

    public void processBatch() { 
     final List<T> values = new ArrayList<>(); 
     try { 
      while (values.size() < batchSize) { 
       final Message message = consumer.receive(); 
       values.add(readObject(message)); 
       message.acknowledge(); 
      } 
      producer.send(createAggregate(values)); 
      session.commit(); 
     } catch (Exception e) { 
      // Log the exception 
      try { 
       session.rollback(); 
      } catch (JMSException re) { 
       // Rollback failed, so something fataly wrong. 
       throw new IllegalStateException(re); 
      } 
     } 
    } 

    private Message createAggregate(final List<T> values) throws JMSException { 
     return session.createObjectMessage((Serializable) values); 
    } 

    private T readObject(final Message message) throws JMSException { 
     return (T) ((ObjectMessage) message).getObject(); 
    } 
} 

यह एक अलग थ्रेड में शुरू किया जा सकता है, और बस हमेशा के लिए चलाएँ:

final JmsBatcher jmsBatcher = 
    new JmsBatcher(connection, "single", "batch", 25); 
new Thread(() -> { 
    while (true) { 
     jmsBatcher.processBatch(); 
    } 
}).start(); 

फिर आप डेटाबेस के लिए प्रतिबद्ध कर सकते हैं बैच किए गए परिणामों से बैचों में। यदि कोई असफलता है, तो लेनदेन का पुन: प्रयास किया जाएगा।

संबंधित मुद्दे