मैं जेएमएस और एक्टिवएमक्यू के साथ काम कर रहा हूं। सब कुछ अद्भुत काम कर रहा है। मैं वसंत का उपयोग नहीं कर रहा हूं, न ही मैं कर सकता हूं।किसी जेएमएस संदेशसूची से रोलबैक सिग्नल करें
इंटरफ़ेस javax.jms.MessageListener
में केवल एक विधि है, onMessage
। एक कार्यान्वयन के भीतर से एक अपवाद फेंक दिया जाएगा। यदि वास्तव में एक अपवाद फेंक दिया जाता है, तो मैं कहता हूं कि संदेश ठीक से संसाधित नहीं किया गया था और फिर से प्रयास करने की आवश्यकता है। इसलिए, मुझे थोड़ी देर इंतजार करने के लिए ActiveMQ की आवश्यकता है और फिर, पुनः प्रयास करें। यानी मुझे जेएमएस लेनदेन को रोलबैक करने के लिए फेंक दिया गया अपवाद चाहिए।
मैं इस तरह के व्यवहार को कैसे पूरा कर सकता हूं?
शायद ActiveMQ में कुछ कॉन्फ़िगरेशन है जो मैं नहीं ढूंढ पा रहा था।
या ... शायद उपभोक्ताओं के लिए MessageListener
रों पंजीकरण के साथ भाग कर और संदेश अपने आप उपभोग की तरह एक एक पाश में, हो सकता है:, बजाय श्रोता के पंजीकरण की
while (true) {
// ... some administrative stuff like ...
session = connection.createSesstion(true, SESSION_TRANSACTED)
try {
Message m = receiver.receive(queue, 1000L);
theMessageListener.onMessage(m);
session.commit();
} catch (Exception e) {
session.rollback();
Thread.sleep(someTimeDefinedSomewhereElse);
}
// ... some more administrative stuff
}
धागे के एक जोड़े में
।
या ... मैं इसे स्वयं करने के लिए MessageListener
एस को किसी भी तरह सजा/एओपी/बाइट-मैनिपुलेट कर सकता हूं।
आप कौन सी मार्ग लेंगे और क्यों?
नोट: मेरे पास MessageListener
एस कोड पर पूरा नियंत्रण नहीं है।
संपादित अवधारणा के सबूत के लिए एक परीक्षण:
@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);
BrokerService brokerService = new BrokerService();
String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.start();
Connection connection = pooledConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
message.acknowledge();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();
{
// A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));
producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");
consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();
brokerService.stop();
assertEquals(3, atomicInteger.get());
}
उत्तर के लिए आपको बहुत अधिक व्हेली और @ अमार धन्यवाद। मैं दोनों को उखाड़ फेंक रहा हूं क्योंकि आप दोनों ने मुझे सही रास्ते में इंगित किया था। लेकिन अभी तक सही जवाब नहीं चुन रहा है। क्योंकि अधिक परीक्षण की आवश्यकता है। –