2010-06-21 7 views
11

ध्यान दें कि मैं कई संदेश श्रोताओं समवर्ती विषय से लगातार संदेशों को संभालने के लिए चाहते हैं। इसके अलावा मैं प्रत्येक संदेश श्रोता को लेनदेन से संचालित करना चाहता हूं ताकि किसी दिए गए संदेश श्रोता में प्रसंस्करण विफलता के परिणामस्वरूप श्रोता का संदेश विषय पर शेष रहे।जावा और वसंत 3.0 के साथ एक जेएमएस विषय (कतार नहीं) से एक साथ कई संदेशों को मैं कैसे संभाल सकता हूं?

वसंत डिफॉल्ट मैसेज लिस्टरनर कंटेनर केवल जेएमएस कतारों के लिए समवर्ती समर्थन का समर्थन करता प्रतीत होता है।

क्या मुझे एकाधिक डिफॉल्ट मैसेज लिस्टनरकंटर्स को तुरंत चालू करने की आवश्यकता है?

समय ऊर्ध्वाधर अक्ष नीचे बहता हैं:

ListenerA reads msg 1  ListenerB reads msg 2  ListenerC reads msg 3 
ListenerA reads msg 4  ListenerB reads msg 5  ListenerC reads msg 6 
ListenerA reads msg 7  ListenerB reads msg 8  ListenerC reads msg 9 
ListenerA reads msg 10  ListenerB reads msg 11  ListenerC reads msg 12 
... 

अद्यतन:
आपकी प्रतिक्रिया @ T.Rob और @skaffman के लिए धन्यवाद।

क्या मैं कर रहा concurrency=1 के साथ कई DefaultMessageListenerContainers बनाने और फिर संदेश श्रोता में तर्क डाल इतना है कि केवल एक ही धागे किसी दिए गए संदेश आईडी पर कार्रवाई होगी है समाप्त हो गया

+0

क्या आप स्पष्टीकरण दे सकते हैं? जब मैं "एकाधिक संदेश श्रोताओं को समसामयिक रूप से विषय से लगातार संदेश संभालने के लिए देखता हूं" मुझे लगता है कि इसका मतलब है कि आप नहीं चाहते हैं कि श्रोताओं को प्रत्येक को एक ही संदेश की एक प्रति प्राप्त हो, बल्कि एक ही विषय पर एक दूसरे के खिलाफ संदेश के लिए प्रतिस्पर्धा करने के लिए। क्या वो सही है? –

+0

यह उपयोगी लग रहा है: http://bsnyderblog.blogspot.com/2010/05/tuning-jms-message-consumption-in.html – skaffman

उत्तर

5

आप एक से अधिक DefaultMessageListenerContainer उदाहरणों, कोई नहीं करना चाहता, लेकिन आप concurrentConsumers property का उपयोग कर, कॉन्फ़िगर करने के लिए DefaultMessageListenerContainer समवर्ती होने की जरूरत है:

समवर्ती उपभोक्ताओं को बनाने के लिए की संख्या निर्दिष्ट करें। डिफ़ॉल्ट 1.

है इस सेटिंग कार्यावधि में अनुसूचित समवर्ती उपभोक्ताओं के मानक स्तर में वृद्धि होगी के लिए एक उच्च मूल्य निर्दिष्ट करना: इस है प्रभावी रूप से समवर्ती उपभोक्ताओं की न्यूनतम संख्या जो किसी भी समय पर अनुसूचित किया जाएगा । यह स्थैतिक सेटिंग है; गतिशील स्केलिंग के लिए, "maxConcurrentConsumers" सेटिंग को निर्दिष्ट करने पर विचार करें।

समवर्ती उपभोक्ताओं की संख्या बढ़ाने से पैमाने को संदेश एक कतार से आ रही की खपत क्रम में recommendable है। हालांकि, पर ध्यान दें कि कोई भी ऑर्डर गारंटी खो जाती है जब एक बार कई उपभोक्ता पंजीकृत होते हैं। आम तौर पर, कम मात्रा वाली कतारों के लिए 1 उपभोक्ता के साथ चिपके रहें।

हालांकि, वहाँ तल पर बड़ा चेतावनी है:

एक विषय के लिए समवर्ती उपभोक्ताओं की संख्या न बढ़ाएं। यह एक ही संदेश है, जो शायद ही कभी वांछनीय है के समवर्ती खपत करने के लिए नेतृत्व करेंगे।

यह दिलचस्प है, और जब आप इसके बारे में सोचते हैं तो समझ में आता है। वही होगा यदि आपके पास DefaultMessageListenerContainer उदाहरण थे।

मुझे लगता है कि शायद आपको अपने डिजाइन पर पुनर्विचार करने की आवश्यकता है, हालांकि मुझे यकीन नहीं है कि मैं क्या सुझाव दूंगा। पब/उप संदेशों की समवर्ती खपत पूरी तरह से उचित चीज की तरह लगती है, लेकिन एक ही संदेश को एक ही समय में आपके सभी उपभोक्ताओं को वितरित करने से कैसे बचें?

1

यह उन अवसरों में से एक है जहां परिवहन प्रदाताओं में अंतर जेएमएस के अमूर्तकरण के माध्यम से बुलबुला हुआ है। जेएमएस एक विषय पर प्रत्येक ग्राहक के लिए संदेश की एक प्रति प्रदान करना चाहता है। लेकिन जो व्यवहार आप चाहते हैं वह वाकई कतार का है। मुझे संदेह है कि एक पब/उप समाधान के लिए इसे चलाने की अन्य आवश्यकताएं हैं जिन्हें वर्णित नहीं किया गया था - उदाहरण के लिए अन्य चीजों को आपके ऐप से स्वतंत्र विषय पर सब्सक्राइब करने की आवश्यकता है।

यदि मैं इसे वेबस्पेयर एमक्यू में करना चाहता हूं तो समाधान एक प्रशासनिक सदस्यता बनाना होगा जिसके परिणामस्वरूप दिए गए विषय पर प्रत्येक संदेश की एक प्रति एक कतार में रखी जाएगी। फिर आपके कई ग्राहक उस कतार पर संदेशों के लिए प्रतिस्पर्धा कर सकते हैं। इस तरह आपके ऐप में कई धागे हो सकते हैं जिनमें संदेश वितरित किए जाते हैं, और साथ ही साथ इस एप्लिकेशन से स्वतंत्र अन्य ग्राहक गतिशील रूप से (एक) उसी विषय की सदस्यता ले सकते हैं।

दुर्भाग्यवश, ऐसा करने का कोई सामान्य जेएमएस-पोर्टेबल तरीका नहीं है। आप एक महान डिग्री के लिए परिवहन प्रदाता के कार्यान्वयन पर निर्भर हैं। इनमें से केवल एक ही मैं वेबस्पेयर एमक्यू से बात कर सकता हूं लेकिन मुझे यकीन है कि अन्य ट्रांसपोर्ट इसे एक तरफ या दूसरे तरीके से समर्थन करते हैं और यदि आप रचनात्मक हैं तो अलग-अलग डिग्री के लिए।

+0

मुझे आपका विचार पसंद है। मुझे लगता है कि हम इसे एक विशिष्ट प्रदाता को लिखने के लिए लागू कर सकते हैं। हम इसके लिए एक विषय और केवल एक ग्राहक बनाते हैं। वह ग्राहक विषय से संदेश को कतार में रखता है और अब कई कतार उपभोक्ता इसके लिए प्रतिस्पर्धा कर सकते हैं। यह संकेत का एक स्तर जोड़ता है, लेकिन डीएमएलसी में विषय के लिए सहमति की समस्या हल करता है। – shrini1000

0

मैंने एक ही समस्या में भाग लिया है। मैं वर्तमान में खरगोश एमक्यू की जांच कर रहा हूं, जो एक डिजाइन पैटर्न में एक आदर्श समाधान प्रदान करता है जिसे वे "कार्य कतार" कहते हैं। यहां अधिक जानकारी: http://www.rabbitmq.com/tutorials/tutorial-two-java.html

यदि आप पूरी तरह से जेएमएस से बंधे नहीं हैं तो आप इसे देख सकते हैं। एएमक्यूपी पुल के लिए एक जेएमएस भी हो सकता है, लेकिन यह हैकी दिखना शुरू कर सकता है।

मुझे कुछ मज़ा आता है (पढ़ना: कठिनाइयों) RabbitMQ स्थापित हो रहा है और मेरे मैक पर चल रहा है लेकिन लगता है कि मैं इसे काम करने के करीब हूं, अगर मैं इसे हल करने में सक्षम हूं तो मैं वापस पोस्ट करूंगा।

+0

कोशिश की और RabbitMQ एक आकर्षण की तरह काम करता है। यह जेएमएस नहीं है, लेकिन मैं वसंत और खरगोश/एएमक्यूपी समर्थन का उपयोग कर रहा हूं मेरे लिए काफी अच्छा है। – cobbzilla

+0

वैसे भी मेरे अनुभव में rabbitmq में क्लस्टरकृत पारिस्थितिक तंत्र – deFreitas

-2

इस प्रश्न में आया था। मेरी कॉन्फ़िगरेशन है:

id="DefaultListenerContainer" के साथ एक बीन बनाएं, संपत्ति name="concurrentConsumers" value="10" और संपत्ति name="maxConcurrentConsumers" value ="50" जोड़ें।

ठीक काम करता है, अभी तक। मैंने थ्रेड आईडी मुद्रित की और सत्यापित किया कि एकाधिक धागे बनाए जाते हैं और पुन: उपयोग किए जाते हैं।

1) केवल एक DMLC सेम और आने वाली संदेश को संभालने के लिए विधि के साथ विन्यस्त बनाएँ:

+0

में संदेशों को खोने में समस्याएं हैं जो उपरोक्त उनके उत्तर में वर्णित स्कैमैन की चेतावनी की जांच करें। – shrini1000

+0

इस उत्तर में प्रदर्शन परीक्षण जोड़ने का वादा था, लेकिन यह कभी भी जोड़ा नहीं गया! मैंने उस पाठ को हटा दिया है, लेकिन यदि आप इसे किसी बिंदु पर जोड़ना चाहते हैं, तो नि: शुल्क महसूस करें। – halfer

1

यहाँ एक संभावना है। इसकी समरूपता को 0

2) एक कार्य निष्पादक को अपनी इच्छानुसार समेकन के बराबर # थ्रेड के साथ कॉन्फ़िगर करें। ऑब्जेक्ट्स के लिए ऑब्जेक्ट पूल बनाएं जो वास्तव में किसी संदेश को संसाधित करना चाहते हैं। # 1 में कॉन्फ़िगर किए गए बीन में कार्य निष्पादक और ऑब्जेक्ट पूल का संदर्भ दें। ऑब्जेक्ट पूल उपयोगी है यदि वास्तविक संदेश प्रसंस्करण बीन थ्रेड-सुरक्षित नहीं है।

3) किसी आवक संदेश के लिए, DMLC में सेम, एक कस्टम Runnable बनाता है संदेश और वस्तु पूल को यह बताते हैं, और यह देता है निष्पादक कार्य करने।

4) Runnable की रन विधि वस्तु पूल से एक सेम हो जाता है और संदेश दिया साथ अपनी 'प्रक्रिया' प्रणाली को बुलाती है।

# 4 प्रॉक्सी और ऑब्जेक्ट पूल के साथ इसे आसान बनाने के लिए प्रबंधित किया जा सकता है।

मैंने अभी तक इस समाधान की कोशिश नहीं की है, लेकिन ऐसा लगता है कि बिल बिलकुल ठीक है। ध्यान दें कि यह समाधान ईजेबी एमडीबी के रूप में मजबूत नहीं है। वसंत उदा। पूल से ऑब्जेक्ट को त्याग नहीं देगा अगर यह रनटाइम अपवाद फेंकता है।

+2

आप कैसे सुनिश्चित करते हैं कि आने वाले जेएमएस संदेशों को तब तक नहीं रखा जाता जब तक कि रननेबल सफलतापूर्वक पूर्ण नहीं हो जाता? –

1

कम से कम ActiveMQ में क्या आप चाहते हैं पूरी तरह से समर्थित है, उसका नाम VirtualTopic

है अवधारणा है:

  1. आप बनाने के एक VirtualTopic (सीधे शब्दों में उपसर्ग VirtualTopic. का उपयोग कर एक विषय बनाने) जैसे । VirtualTopic.Color
  2. इस वर्चुअलटॉपिक इस पैटर्न से मेल खाने वाला उपभोक्ता Consumer.<clientName>.VirtualTopic.<topicName> उदाहरण के लिए सदस्यता लेने वाला उपभोक्ता बनाएं। Consumer.client1.VirtualTopic.Color, यह कर रही है, ActiveMQ उस नाम का कोई कतार पैदा करेगा और उस कतार, VirtualTopic.Color तो इस आभासी विषय पर प्रकाशित वाले प्रत्येक संदेश में client1 कतार वितरित किया जाएगा करने के लिए सदस्यता ध्यान दें कि यह RabbitMQ एक्सचेंजों की तरह काम करता होगा।
  3. आप इस बिंदु पर, किया जाता है अब आप उपभोग कर सकते हैं client1 कतार हर पंक्ति की तरह कई उपभोक्ताओं, DLQ, अनुकूलित पुनर्वितरण नीति, आदि
  4. साथ, मुझे लगता है कि आप समझ गया कि आप बना सकते हैं client2, client3 और कितने ग्राहकों आप चाहते हैं, उन सभी को संदेश VirtualTopic.Color

यहाँ कोड को प्रकाशित की एक प्रति प्राप्त करेंगे

+०१२३५१६४१०६१
@Component 
public class ColorReceiver { 

    private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class); 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    // simply generating data to the topic 
    long id=0; 
    @Scheduled(fixedDelay = 500) 
    public void postMail() throws JMSException, IOException { 

     final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)]; 
     final Color color = new Color(++id, colorName.getName()); 
     final ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 
     message.setObject(color); 
     message.setProperty("color", color.getName()); 
     LOGGER.info("status=color-post, color={}", color); 
     jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message); 
    } 

    /** 
    * Listen all colors messages 
    */ 
    @JmsListener(
     destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer" 
     selector = "color <> 'RED'" 
    ) 
    public void genericReceiveMessage(Color color) throws InterruptedException { 
     LOGGER.info("status=GEN-color-receiver, color={}", color); 
    } 

    /** 
    * Listen only red colors messages 
    * 
    * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that 
    * the containers clientId need to be different between each other 
    */ 
    @JmsListener(
//  destination = "Consumer.redColorContainer.VirtualTopic.color", 
     destination = "Consumer.client1.VirtualTopic.color", 
     containerFactory = "redColorContainer", selector = "color='RED'" 
    ) 
    public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException { 
     LOGGER.info("status=RED-color-receiver, color={}", message.getObject()); 
    } 

    /** 
    * Listen all colors messages 
    */ 
    @JmsListener(
     destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer" 
    ) 
    public void genericReceiveMessage2(Color color) throws InterruptedException { 
     LOGGER.info("status=GEN-color-receiver-2, color={}", color); 
    } 

} 

@SpringBootApplication 
@EnableJms 
@EnableScheduling 
@Configuration 
public class Config { 

    /** 
    * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different 
    * clientIds per consumer pool (as two @JmsListener above, or two application instances) 
    * 
    */ 
    @Bean 
    public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory, 
     DefaultJmsListenerContainerFactoryConfigurer configurer) { 

     final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setConcurrency("1-5"); 
     configurer.configure(factory, connectionFactory); 
     // container.setClientId("aId..."); lets spring generate a random ID 
     return factory; 
    } 

    @Bean 
    public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory, 
     DefaultJmsListenerContainerFactoryConfigurer configurer) { 

     // necessary when post serializable objects (you can set it at application.properties) 
     connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName())); 

     final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setConcurrency("1-2"); 
     configurer.configure(factory, connectionFactory); 
     return factory; 
    } 

} 

public class Color implements Serializable { 

    public static final Color WHITE = new Color("WHITE"); 
    public static final Color BLUE = new Color("BLUE"); 
    public static final Color RED = new Color("RED"); 

    private String name; 
    private long id; 

    // CONSTRUCTORS, GETTERS AND SETTERS 
} 
संबंधित मुद्दे

 संबंधित मुद्दे