2016-09-16 9 views
5

में एक या नाक का उपयोग कैसे करें मैं वसंत AMQP के लिए नया हूं। मेरे पास एक ऐसा एप्लिकेशन है जो एक निर्माता है जो अन्य उपभोक्ता को संदेश भेज रहा है जो एक उपभोक्ता है।वसंत AMQP

एक बार उपभोक्ता संदेश प्राप्त करता है, हम डेटा के सत्यापन करेंगे।

डेटा उचित है अगर हम एसीके के लिए और संदेश कतार से हटाया जाना चाहिए है। यदि डेटा अनुचित है तो हमें डेटा को नकारात्मक (नकारात्मक स्वीकार) करना होगा ताकि इसे RabbitMQ में फिर से पंक्तिबद्ध किया जा सके।

मैं (यह बिल्कुल संदेश requeue नहीं होगा)

**factory.setDefaultRequeueRejected(true);** (जब अपवाद तब होता है यह संदेश requeue जाएगा)

लेकिन मेरे मामले में मैं संदेश को स्वीकार करेगा आधारित

**factory.setDefaultRequeueRejected(false);** में आए सत्यापन पर फिर इसे संदेश हटा देना चाहिए। अगर नाक तो संदेश की आवश्यकता है।

मैं RabbitMQ वेबसाइट में पढ़ा है

AMQP विनिर्देश को परिभाषित करता है basic.reject विधि है कि ग्राहकों को अलग-अलग अस्वीकार करने के लिए, संदेश दिए, दलाल को निर्देश या तो उन्हें त्यागने या उन्हें

कैसे requeue लिए अनुमति देता है उपरोक्त परिदृश्य को प्राप्त करने के लिए? कृपया मुझे कुछ उदाहरण दें। 46: 38,854 त्रुटि [stderr] (SimpleAsyncTaskExecutor

मैं एक छोटे से कार्यक्रम

 logger.info("Job Queue Handler::::::::::" + new Date()); 
     try { 

     }catch(Exception e){ 

      logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::"); 

     } 

     factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{ 
      return cause instanceof XMLException; 
     })); 

संदेश factory.setDefaultRequeueRejected (सच)

09 विभिन्न अपवाद के लिए कतार फिर से नहीं कर रहा है की कोशिश की -1) org.activiti.engine.ActivitiObjectNotFoundException: कुंजी 'WF89012'

012 के साथ तैनात कोई प्रक्रिया नहीं

09: 46: 39,102 जानकारी [com.example.bip.rabbitmq.handler.ErrorQueueHandler] (SimpleAsyncTaskExecutor -1) त्रुटि कतार से प्राप्त: {त्रुटि = नहीं जेपीए लेन-देन के लिए प्रतिबद्ध किया जा सका; नेस्टेड अपवाद javax.persistence.RollbackException है: लेन-देन के रूप में चिह्नित rollbackOnly}

उत्तर

4

देखें the documentation

डिफॉल्ट रूप से, (defaultRequeueRejected=true के साथ) कंटेनर संदेश को अचंभित कर देगा (इसे हटाया जा सकता है) यदि श्रोता सामान्य रूप से बाहर निकलता है या अस्वीकार करता है (और अपेक्षित) यदि श्रोता एक अपवाद फेंकता है।

श्रोता (या त्रुटि हैंडलर) फेंकता है तो एक AmqpRejectAndDontRequeueException, डिफ़ॉल्ट व्यवहार ओवरराइड की गई है और संदेश छोड़ दिया है (या कराई एक DLX/DLQ को यदि ऐसा है तो कॉन्फ़िगर किया गया) - कंटेनर basicReject(false) बजाय basicReject(true) कहता है।

इसलिए, यदि आपका सत्यापन विफल हो जाता है, एक AmqpRejectAndDontRequeueException फेंक देते हैं। या, किसी कस्टम त्रुटि हैंडलर के साथ अपने श्रोता कॉन्फ़िगर एक AmqpRejectAndDontRequeueException करने के लिए अपने अपवाद कन्वर्ट करने के लिए।

this answer में वर्णन किया गया है।

तुम सच में अपने आप को acking के लिए जिम्मेदारी लेने के लिए चाहते हैं, MANUAL को पावती मोड सेट और अगर आप एक @RabbitListener उपयोग कर रहे हैं एक ChannelAwareMessageListener या this technique का उपयोग करें।

लेकिन अधिकांश लोग बस कंटेनर बातों का ख्याल रखना (एक बार वे समझते क्या हो रहा है) करते हैं। आम तौर पर, मैन्युअल एके का उपयोग विशेष उपयोग के मामलों के लिए होता है, जैसे कि एक्स को डिफेंस करना, या शुरुआती एकिंग।

संपादित

जवाब मैं करने के लिए (अब तय हो गई) आप इशारा में कोई गलती नहीं था; आपको ListenerExecutionFailedException के कारण को देखना होगा। मैं सिर्फ यह परीक्षण किया है और यह के रूप में की उम्मीद काम करता है ...

@SpringBootApplication 
public class So39530787Application { 

    private static final String QUEUE = "So39530787"; 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     template.convertAndSend(QUEUE, "foo"); 
     template.convertAndSend(QUEUE, "bar"); 
     template.convertAndSend(QUEUE, "baz"); 
     So39530787Application bean = context.getBean(So39530787Application.class); 
     bean.latch.await(10, TimeUnit.SECONDS); 
     System.out.println("Expect 1 foo:" + bean.fooCount); 
     System.out.println("Expect 3 bar:" + bean.barCount); 
     System.out.println("Expect 1 baz:" + bean.bazCount); 
     context.close(); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setErrorHandler(new ConditionalRejectingErrorHandler(
       t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException)); 
     return factory; 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue(QUEUE, false, false, true); 
    } 
    private int fooCount; 

    private int barCount; 

    private int bazCount; 

    private final CountDownLatch latch = new CountDownLatch(5); 

    @RabbitListener(queues = QUEUE) 
    public void handle(String in) throws Exception { 
     System.out.println(in); 
     latch.countDown(); 
     if ("foo".equals(in) && ++this.fooCount < 3) { 
      throw new FooException(); 
     } 
     else if ("bar".equals(in) && ++this.barCount < 3) { 
      throw new BarException(); 
     } 
     else if ("baz".equals(in)) { 
      this.bazCount++; 
     } 
    } 

    @SuppressWarnings("serial") 
    public static class FooException extends Exception { } 

    @SuppressWarnings("serial") 
    public static class BarException extends Exception { } 

} 

परिणाम:

Expect 1 foo:1 
Expect 3 bar:3 
Expect 1 baz:1 
+0

स्पष्टीकरण के लिए धन्यवाद। मैंने कुछ कार्यक्रम की कोशिश की। क्या आप गलती को सुधार सकते हैं? – Chandan

+0

यह मतलब नहीं है हमेशा त्रुटि हैंडलर से एक 'AmqpRejectAndDontRequeueException' फेंकने के लिए; इसके लिए, बस 'डिफ़ॉल्ट RequeueRejected = false' सेट करें। [इस उत्तर] में तकनीक का उपयोग करें (http://stackoverflow.com/questions/39509745/negative-acknowledgement-to-rabbitmq-queue-to-re-queue-the-message-using-spring/39511357#39511357) से केवल इसे 'XMLException' के लिए फेंक दें - आपके श्रोता को इसे फिर से फेंकना होगा। –

+0

मेरे उत्तर में संपादन देखें। –

संबंधित मुद्दे