2011-08-27 8 views
12

मैं जेएमएस और एक्टिवएमक्यू के साथ काम कर रहा हूं। सब कुछ अद्भुत काम कर रहा है। मैं वसंत का उपयोग नहीं कर रहा हूं, न ही मैं कर सकता हूं।किसी जेएमएस संदेशसूची से रोलबैक सिग्नल करें

इंटरफ़ेस javax.jms.MessageListener में केवल एक विधि है, onMessage। एक कार्यान्वयन के भीतर से एक अपवाद फेंक दिया जाएगा। यदि वास्तव में एक अपवाद फेंक दिया जाता है, तो मैं कहता हूं कि संदेश ठीक से संसाधित नहीं किया गया था और फिर से प्रयास करने की आवश्यकता है। इसलिए, मुझे थोड़ी देर इंतजार करने के लिए ActiveMQ की आवश्यकता है और फिर, पुनः प्रयास करें। यानी मुझे जेएमएस लेनदेन को रोलबैक करने के लिए फेंक दिया गया अपवाद चाहिए।

मैं इस तरह के व्यवहार को कैसे पूरा कर सकता हूं?

शायद ActiveMQ में कुछ कॉन्फ़िगरेशन है जो मैं नहीं ढूंढ पा रहा था।

या ... शायद उपभोक्ताओं के लिए MessageListener रों पंजीकरण के साथ भाग कर और संदेश अपने आप उपभोग की तरह एक एक पाश में, हो सकता है:, बजाय श्रोता के पंजीकरण की

while (true) { 
    // ... some administrative stuff like ... 
    session = connection.createSesstion(true, SESSION_TRANSACTED) 
    try { 
     Message m = receiver.receive(queue, 1000L); 
     theMessageListener.onMessage(m); 
     session.commit(); 
    } catch (Exception e) { 
     session.rollback(); 
     Thread.sleep(someTimeDefinedSomewhereElse); 
    } 
    // ... some more administrative stuff 
} 
धागे के एक जोड़े में

या ... मैं इसे स्वयं करने के लिए MessageListener एस को किसी भी तरह सजा/एओपी/बाइट-मैनिपुलेट कर सकता हूं।

आप कौन सी मार्ग लेंगे और क्यों?

नोट: मेरे पास MessageListener एस कोड पर पूरा नियंत्रण नहीं है।

संपादित अवधारणा के सबूत के लिए एक परीक्षण:

@Test 
@Ignore("Interactive test, just a proof of concept") 
public void transaccionConListener() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     @Override 
     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         message.acknowledge(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         message.acknowledge(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    brokerService.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 
+0

उत्तर के लिए आपको बहुत अधिक व्हेली और @ अमार धन्यवाद। मैं दोनों को उखाड़ फेंक रहा हूं क्योंकि आप दोनों ने मुझे सही रास्ते में इंगित किया था। लेकिन अभी तक सही जवाब नहीं चुन रहा है। क्योंकि अधिक परीक्षण की आवश्यकता है। –

उत्तर

10

आप अपने रसीद मोड के रूप में SESSION_TRANSACTED का उपयोग करना चाहते हैं, तो आप सेटअप एक RedeliveryPolicy on your Connection/ConnectionFactory की जरूरत है। This page on ActiveMQ's website में आपको कुछ भी करने की आवश्यकता हो सकती है, इसके लिए कुछ अच्छी जानकारी भी शामिल है।

जब से तुम स्प्रिंग उपयोग नहीं कर रहे, तो आपको निम्न कोड (ऊपर दिए गए लिंक में से एक से लिया गया) करने के लिए कुछ इसी तरह के साथ एक RedeliveryPolicy सेटअप कर सकते हैं:

RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 
policy.setInitialRedeliveryDelay(500); 
policy.setBackOffMultiplier(2); 
policy.setUseExponentialBackOff(true); 
policy.setMaximumRedeliveries(2); 

संपादित लेते हुए अपने कोड स्निपेट उत्तर में जोड़ा गया, निम्नलिखित दिखाता है कि यह लेनदेन के साथ कैसे काम करता है। सत्र के साथ इस कोड को आज़माएं .rollback() विधि ने टिप्पणी की और आप देखेंगे कि SESION_TRANSACTED और सत्र का उपयोग कर।प्रतिबद्ध/रोलबैक की उम्मीद के रूप में काम करता है:

@Test 
public void test() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         session.commit(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         session.rollback(); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         session.commit(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 

}

+0

यह काम नहीं किया। लेकिन मुझे सही दिशा में इंगित किया। मैं DUPS_OK_ACKNOWLEDGE छोड़ दूंगा क्योंकि ऐसा लगता है कि मुझे कम से कम काम करना है। –

+0

आपको अपने कोड की संपूर्णता पेस्ट करने की आवश्यकता है, क्योंकि आप अपने सत्र के साथ कुछ सही तरीके से नहीं कर रहे हैं। DUPS_OK_ACKNOWLEDGE केवल काम कर रहा है क्योंकि क्लाइंट पावती आलसी है और ब्रोकर केवल तब तक संदेश भेजता रहेगा जब तक ग्राहक अंततः एके नहीं करता। – whaley

+0

मैंने अवधारणा का सबूत चिपकाया। मैं इसे केवल DUPS_OK_ACKNOWLEDGE और message.acknowledgement के साथ काम करने में कोई फर्क नहीं पड़ता। –

2

आप Session.CLIENT_ACKNOWLEDGE को पावती मोड सेट करने की जरूरत है, ग्राहक संदेश के स्वीकार करते हैं विधि को फोन करके एक भस्म संदेश को स्वीकार करता है।

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

फिर, संदेश संसाधित करने के बाद उस संदेश को हटाने के लिए Message.acknowledge() विधि कॉल करने की आवश्यकता है।

Message message = ...; 
// Processing message 

message.acknowledge(); 
+0

यह काम नहीं करता है। _onMessage_ अभी भी एक बार बुलाया जाता है, भले ही _message.acknowledge() _ कभी नहीं बुलाया जाता है। –

+0

क्या आपने पावती मोड को सही तरीके से सेट किया है? इसे सत्र में सेट किया जाना चाहिए। CLIENT_ACKNOWLEDGE! – Ammar

+0

लेकिन यह (झूठी, सत्र .DUPS_OK_ACKNOWLEDGE) के साथ काम करता है ... message.acknowledge() चाल नहीं करता है। –

0

अपने सत्र लेनदेन किया जाता है, तो "acknowledgeMode" anyways..So नजरअंदाज कर दिया है, बस छोड़ अपने सत्र लेन-देन और session.rollback और session.commit का उपयोग अपने लेनदेन को प्रतिबद्ध या रोलबैक करने के लिए।

+1

मुझे लगता है कि (मेरी) समस्या यह है कि सत्र MessageListener.onMessage (संदेश) के भीतर पहुंच योग्य नहीं है। –

संबंधित मुद्दे