2010-12-14 18 views
23

में देरी संदेश क्या कुछ देरी के साथ RabbitMQ के माध्यम से संदेश भेजना संभव है? उदाहरण के लिए मैं 30 मिनट के बाद क्लाइंट सत्र समाप्त करना चाहता हूं, और मैं एक संदेश भेजता हूं जिसे 30 मिनट के बाद संसाधित किया जाएगा।RabbitMQ

+0

आप RabbitMQ उपयोग करने की आवश्यकता है? –

+1

हां, यह सुविधा RabbitMQ-3.5.8 के बाद उपलब्ध है। https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ – lambodar

+0

यदि आप स्प्रिंग एएमक्यूपी का उपयोग करते हैं, तो [प्लगइन के लिए समर्थन] है [https://docs.spring.io/spring-amqp/reference/htmlsingle/# देरी-संदेश-विनिमय)। – Gruber

उत्तर

5

यह वर्तमान में संभव नहीं है। आपको अपने समाप्ति टाइमस्टैम्प को किसी डेटाबेस या कुछ समान में संग्रहीत करना होगा, और फिर एक सहायक कार्यक्रम है जो उन टाइमस्टैम्प को पढ़ता है और एक संदेश कतार देता है।

विलंबित संदेश अक्सर अनुरोधित सुविधा होते हैं, क्योंकि वे कई परिस्थितियों में उपयोगी होते हैं। हालांकि, यदि आपकी ज़रूरतें क्लाइंट सत्रों की अवधि समाप्त करने के लिए हैं, तो मेरा मानना ​​है कि संदेश आपके लिए आदर्श समाधान नहीं है, और यह कि एक और दृष्टिकोण बेहतर काम कर सकता है।

9

RabbitMQ v2.8 की रिलीज के साथ, अनुसूचित वितरण अब उपलब्ध है, लेकिन एक अप्रत्यक्ष सुविधा के रूप में: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

+0

मैंने इस दृष्टिकोण की कोशिश की लेकिन कुछ मुद्दों को मारा, सुझाव किसी को भी? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 –

+5

मैंने एक स्पाइक किया और कुछ शोस्टॉपर्स मारा: 1. संदेश केवल डीएलक्यू हैं: जब क्यू के शीर्ष पर (http://www.rabbitmq.com/ttl.html - चेतावनी अनुभाग) इसका मतलब है कि यदि मैंने पहली बार 4 घंटे में समाप्त होने के लिए msgstr 1 को सेट किया है और msgstr 1 घंटे में समाप्त होने के लिए msgstr msgstr "केवल 1 की अवधि समाप्त हो जाने के बाद ही समाप्त हो जाएगी। 2. संदेश के लिए टीटीएल खरगोश द्वारा रखा जाता है तो मान लें कि आप 10 एस के कम समय का उपयोग करते हैं। यदि उपभोक्ता समाप्त होने के 10 सेकंड बाद (0 बैकलॉग के कारण) के साथ संदेश का उपभोग करने में सक्षम नहीं है, तो इसे पर छोड़ दिया जाएगा ऊपर दिया गया है रैबिट 3.0.1 क्या आप लोग किसी भी कामकाज को देखते हैं ? –

6

मैं टिप्पणी जोड़ने के लिए पर्याप्त प्रतिष्ठा है न, एक नया उत्तर पोस्ट के रूप में। यह http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

संदेशों पर टीटीएल सेट करने के बजाय पहले से ही चर्चा की गई है, आप इसे कतार स्तर पर सेट कर सकते हैं। इसके अलावा आप संदेशों को अलग-अलग कतार में रीडायरेक्ट करने के लिए केवल एक नया एक्सचेंज बनाने से बच सकते हैं। यहाँ नमूना जावा कोड है:

निर्माता:

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import java.util.HashMap; 
import java.util.Map; 

public class DelayedProducer { 
    private final static String QUEUE_NAME = "ParkingQueue"; 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory connectionFactory = new ConnectionFactory(); 
     connectionFactory.setHost("localhost"); 
     Connection connection = connectionFactory.newConnection(); 
     Channel channel = connection.createChannel(); 

     Map<String, Object> arguments = new HashMap<String, Object>(); 
     arguments.put("x-message-ttl", 10000); 
     arguments.put("x-dead-letter-exchange", ""); 
     arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME); 
     channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 

     for (int i=0; i<5; i++) { 
      String message = "This is a sample message " + i; 
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
      System.out.println("message "+i+" got published to the queue!"); 
      Thread.sleep(3000); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

उपभोक्ता:

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 

public class Consumer { 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     boolean autoAck = false; 
     channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer); 

     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      System.out.println(" [x] Received '" + message + "'"); 
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
     } 

    } 
} 
+0

आपको बहुत बहुत धन्यवाद। मुझे लगता है कि उपभोक्ता कतार में आपको एक छोटी सी गलती है। Channel.queueDeclare (QUEUE_NAME, झूठी, झूठी, झूठी, शून्य) घोषित करें; इसमें "QUEUE_NAME" के बजाय "DESTINATION_QUEUE_NAME" होना चाहिए। वास्तव में बहुत बहुत धन्यवाद –

5

ऐसा लगता है कि this blog post मृत पत्र मुद्रा और संदेश टीटीएल का उपयोग कर कुछ इसी तरह करने के लिए का वर्णन करता है।

नीचे दिया गया कोड कॉफीस्क्रिप्ट और नोड.जेएस का उपयोग खरगोश तक पहुंचने और कुछ इसी तरह लागू करने के लिए करता है।

amqp = require 'amqp' 
events = require 'events' 
em  = new events.EventEmitter() 
conn = amqp.createConnection() 

key = "send.later.#{new Date().getTime()}" 
conn.on 'ready', -> 
    conn.queue key, { 
    arguments:{ 
     "x-dead-letter-exchange":"immediate" 
    , "x-message-ttl": 5000 
    , "x-expires": 6000 
    } 
    }, -> 
    conn.publish key, {v:1}, {contentType:'application/json'} 

    conn.exchange 'immediate' 

    conn.queue 'right.now.queue', { 
     autoDelete: false 
    , durable: true 
    }, (q) -> 
    q.bind('immediate', 'right.now.queue') 
    q.subscribe (msg, headers, deliveryInfo) -> 
     console.log msg 
     console.log headers 
0

मान लीजिए आप उपभोक्ता पर नियंत्रण था, तो आप इस ?? जैसे उपभोक्ता पर देरी को प्राप्त कर सकते हैं:

हम यह सुनिश्चित करें कि कतार में nth संदेश हमेशा एक छोटे देरी है कर रहे हैं एन + 1 वें संदेश की तुलना में (यह कई उपयोग मामलों के लिए सच हो सकता है): निर्माता उस कार्य को समय-समय पर भेजता है जिस पर इस नौकरी को निष्पादित करने की आवश्यकता होती है (वर्तमान समय + देरी)। उपभोक्ता:

1) कार्य

2) से scheduledTime पढ़ता है अगर मौजूदा समय> scheduledTime आगे बढ़ो।

वरना देरी = scheduledTime - मौजूदा समय देरी

उपभोक्ता ने संकेत समय के लिए

नींद हमेशा एक संगामिति पैरामीटर के साथ कॉन्फ़िगर किया गया है। इसलिए, अन्य संदेश कतार में तब तक इंतजार करेंगे जब तक कोई उपभोक्ता नौकरी खत्म नहीं कर लेता है। इसलिए, यह समाधान सिर्फ अच्छी तरह से काम कर सकता है हालांकि यह विशेष रूप से बड़ी समय में देरी के लिए अजीब लग रहा है।

5

नॉर्मन के उत्तर के लिए धन्यवाद, मैं इसे नोडजेएस में लागू कर सकता हूं।

सबकुछ कोड से बहुत स्पष्ट है। आशा है कि यह किसी के समय को बचाएगा।

पुराने दृष्टिकोण:: टीटीएल (जीने के लिए समय) शीर्षक प्रत्येक संदेश/कतार में (नीति) सेट और फिर इसे संभालने के लिए एक DLQ परिचय

var ch = channel; 
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false}); 
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false}); 

// setup intermediate queue which will never be listened. 
// all messages are TTLed so when they are "dead", they come to another exchange 
ch.assertQueue("my_intermediate_queue", { 
     deadLetterExchange: "my_final_delayed_exchange", 
     messageTtl: 5000, // 5sec 
}, function (err, q) { 
     ch.bindQueue(q.queue, "my_intermediate_exchange", ''); 
}); 

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) { 
     ch.bindQueue(q.queue, "my_final_delayed_exchange", ''); 

     ch.consume(q.queue, function (msg) { 
      console.log("delayed - [x] %s", msg.content.toString()); 
     }, {noAck: true}); 
}); 
4

दो दृष्टिकोण तुम कोशिश कर सकते हैं। एक बार टीटीएल की समयसीमा समाप्त हो जाने के बाद आपके संदेश डीएलक्यू से मुख्य कतार में चले जाएंगे ताकि आपका श्रोता इसे संसाधित कर सके।

नवीनतम दृष्टिकोण: हाल ही में RabbitMQ साथ RabbitMQ विलंबित प्लग संदेश आया था, जिसका उपयोग करके आप एक ही और RabbitMQ-3.5.8 के बाद से इस प्लगइन समर्थन उपलब्ध हासिल कर सकते हैं।

आप एक्स-देरी वाले संदेश के साथ एक एक्सचेंज घोषित कर सकते हैं और फिर संदेश के लिए देरी समय में मिलीसेकंड में व्यक्त कस्टम हेडर एक्स-देरी के साथ संदेश प्रकाशित कर सकते हैं। संदेश एक्स-विलंब को मिलीसेकंड

अधिक यहाँ के बाद संबंधित कतारों को वितरित किया जाएगा: git

संबंधित मुद्दे