2017-03-24 27 views
14

के लिए देरी कतार का कार्यान्वयन, मैंने निर्माता/उपभोक्ता कतार प्रणाली पर त्वरित कार्यान्वयन किया।हाल ही में PHP AMQP

<?php 
namespace Queue; 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable;  

class Amqp 
{ 
    private $connection; 
    private $queueName; 
    private $delayedQueueName; 
    private $channel; 
    private $callback; 

    public function __construct($host, $port, $login, $password, $queueName) 
    { 
     $this->connection = new AMQPStreamConnection($host, $port, $login, $password); 
     $this->queueName = $queueName; 
     $this->delayedQueueName = null; 
     $this->channel = $this->connection->channel(); 
     // First, we need to make sure that RabbitMQ will never lose our queue. 
     // In order to do so, we need to declare it as durable. To do so we pass 
     // the third parameter to queue_declare as true. 
     $this->channel->queue_declare($queueName, false, true, false, false); 
    } 

    public function __destruct() 
    { 
     $this->close(); 
    } 

    // Just in case : http://stackoverflow.com/questions/151660/can-i-trust-php-destruct-method-to-be-called 
    // We should call close explicitly if possible. 
    public function close() 
    { 
     if (!is_null($this->channel)) { 
      $this->channel->close(); 
      $this->channel = null; 
     } 

     if (!is_null($this->connection)) { 
      $this->connection->close(); 
      $this->connection = null; 
     } 
    } 

    public function produceWithDelay($data, $delay) 
    { 
     if (is_null($this->delayedQueueName)) 
     { 
      $delayedQueueName = $this->queueName . '.delayed'; 

      // First, we need to make sure that RabbitMQ will never lose our queue. 
      // In order to do so, we need to declare it as durable. To do so we pass 
      // the third parameter to queue_declare as true. 
      $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false, 
       new AMQPTable(array(
        'x-dead-letter-exchange' => '', 
        'x-dead-letter-routing-key' => $this->queueName 
       )) 
      ); 

      $this->delayedQueueName = $delayedQueueName; 
     } 

     $msg = new AMQPMessage(
      $data, 
      array(
       'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 
       'expiration' => $delay 
      ) 
     ); 

     $this->channel->basic_publish($msg, '', $this->delayedQueueName); 
    } 

    public function produce($data) 
    { 
     $msg = new AMQPMessage(
      $data, 
      array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) 
     ); 

     $this->channel->basic_publish($msg, '', $this->queueName); 
    } 

    public function consume($callback) 
    { 
     $this->callback = $callback; 

     // This tells RabbitMQ not to give more than one message to a worker at 
     // a time. 
     $this->channel->basic_qos(null, 1, null); 

     // Requires ack. 
     $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback')); 

     while(count($this->channel->callbacks)) { 
      $this->channel->wait(); 
     } 
    } 

    public function consumeCallback($msg) 
    { 
     call_user_func_array(
      $this->callback, 
      array($msg) 
     ); 

     // Very important to ack, in order to remove msg from queue. Ack after 
     // callback, as exception might happen in callback. 
     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
    } 

    public function getQueueSize() 
    { 
     // three tuple containing (<queue name>, <message count>, <consumer count>) 
     $tuple = $this->channel->queue_declare($this->queueName, false, true, false, false); 
     if ($tuple != null && isset($tuple[1])) { 
      return $tuple[1]; 
     } 
     return -1; 
    } 
} 

public function produce और public function consume जोड़ी के रूप में उम्मीद काम करता है।

हालांकि, जब यह देरी कतार सिस्टम के साथ आता

public function produceWithDelay और public function consume जोड़ी अपेक्षा के अनुरूप काम नहीं करता है। उपभोक्ता जो consume पर कॉल करता है, किसी भी आइटम को प्राप्त करने में सक्षम नहीं है, यहां तक ​​कि कुछ समय के लिए भी इंतजार कर रहा है।

मुझे विश्वास है कि मेरे produceWithDelay कार्यान्वयन के साथ कुछ सही नहीं है। क्या मुझे पता है कि क्या गलत है?

+0

कोशिश के रूप में' $ चैनल> queue_declare ("नाम", झूठी, झूठे, झूठे, सच, सच, सरणी()) अपने कतार घोषित करने के लिए ] (https://gist.github.com/tairov/11289983) – Vardius

+0

इसे स्क्रैच से लागू करने की कोई आवश्यकता नहीं है। इस तरह आपको इसे करना चाहिए https://stackoverflow.com/a/45549182/579025 –

उत्तर

1

साइड नोट के लिए।

मुझे पता चला कि यह मेरी अपनी बग के कारण है।

if (is_null($this->delayedQueueName)) 
    { 
     $delayedQueueName = $this->queueName . '.delayed'; 

     $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false, 
     ... 

     $this->delayedQueueName = $delayedQueueName; 
    } 

के बजाय मैं

if (is_null($this->delayedQueueName)) 
    { 
     $delayedQueueName = $this->queueName . '.delayed'; 

     $this->channel->queue_declare(delayedQueueName, false, true, false, false, false, 
     ... 

     $this->delayedQueueName = $delayedQueueName; 
    } 

में यह लिखना चाहिए मेरे सदस्य चर अभी तक ठीक से प्रारंभ नहीं किया गया है।

आपके संदर्भ उद्देश्य के लिए एक पूरी तरह से काम करने योग्य कोड का पालन करना है।

<?php 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable; 

class Amqp 
{ 
    private $connection; 
    private $queueName; 
    private $delayedQueueName; 
    private $channel; 
    private $callback; 

    public function __construct($host, $port, $login, $password, $queueName) 
    { 
     $this->connection = new AMQPStreamConnection($host, $port, $login, $password); 
     $this->queueName = $queueName; 
     $this->delayedQueueName = null; 
     $this->channel = $this->connection->channel(); 
     $this->channel->queue_declare($queueName, false, true, false, false); 
    } 

    public function __destruct() 
    { 
     $this->close(); 
    } 

    public function close() 
    { 
     if (!is_null($this->channel)) { 
      $this->channel->close(); 
      $this->channel = null; 
     } 

     if (!is_null($this->connection)) { 
      $this->connection->close(); 
      $this->connection = null; 
     } 
    } 

    public function produceWithDelay($data, $delay) 
    { 
     if (is_null($this->delayedQueueName)) 
     { 
      $delayedQueueName = $this->queueName . '.delayed'; 

      $this->channel->queue_declare($delayedQueueName, false, true, false, false, false, 
       new AMQPTable(array(
        'x-dead-letter-exchange' => '', 
        'x-dead-letter-routing-key' => $this->queueName 
       )) 
      ); 

      $this->delayedQueueName = $delayedQueueName; 
     } 

     $msg = new AMQPMessage(
      $data, 
      array(
       'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 
       'expiration' => $delay 
      ) 
     ); 

     $this->channel->basic_publish($msg, '', $this->delayedQueueName); 
    } 

    public function produce($data) 
    { 
     $msg = new AMQPMessage(
      $data, 
      array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) 
     ); 

     $this->channel->basic_publish($msg, '', $this->queueName); 
    } 

    public function consume($callback) 
    { 
     $this->callback = $callback; 

     $this->channel->basic_qos(null, 1, null); 

     $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback')); 

     while (count($this->channel->callbacks)) { 
      $this->channel->wait(); 
     } 
    } 

    public function callback($msg) 
    { 
     call_user_func_array(
      $this->callback, 
      array($msg) 
     ); 

     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
    } 
} 
3

सभी की मुट्ठी यह सत्यापित करें कि आपकी प्लगइन rabbitmq_delayed_message_exchange कमांड चलाकर सक्षम है: rabbitmq-plugins list, यदि नहीं - तो अधिक जानकारी here पढ़ें।

और आपको अपनी __construct विधि को अपडेट करना होगा क्योंकि आपको क्यूई को थोड़ा और तरीके से घोषित करने की आवश्यकता है। मैं अपने निर्माण को अद्यतन करने के लिए बहाना नहीं है, लेकिन मेरी सरल उदाहरण प्रदान करना चाहते हैं:

प्रचार कतार:

<?php 

require_once __DIR__ . '/../vendor/autoload.php'; 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable; 

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 
$channel = $connection->channel(); 
$args = new AMQPTable(['x-delayed-type' => 'fanout']); 
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args); 
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']); 
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args); 
$channel->queue_bind('delayed_queue', 'delayed_exchange'); 

संदेश भेजें:

$data = 'Hello World at ' . date('Y-m-d H:i:s'); 
$delay = 7000; 
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); 
$headers = new AMQPTable(['x-delay' => $delay]); 
$message->set('application_headers', $headers); 
$channel->basic_publish($message, 'delayed_exchange'); 
printf(' [x] Message sent: %s %s', $data, PHP_EOL); 
$channel->close(); 
$connection->close(); 

प्राप्त संदेश:

$callback = function (AMQPMessage $message) { 
    printf(' [x] Message received: %s %s', $message->body, PHP_EOL); 
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); 
}; 
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback); 
while(count($channel->callbacks)) { 
    $channel->wait(); 
} 
$channel->close(); 
$connection->close(); 

इसके अलावा आप स्रोत फ़ाइलों को here पा सकते हैं।
उम्मीद है कि यह आपकी मदद करेगा!

0

यदि आप कतार इंटरऑप चुनते हैं तो देरी से संदेश भेजने में कुछ पंक्तियां होती हैं। टीटीएल प्लस डेड लेटर एक्सचेंज दृष्टिकोण के साथ-साथ देरी वाली प्लगइन पर आधारित एक समाधान है। शायद `और विनिमय निम्नलिखित इस [सार अगले;

https://blog.forma-pro.com/rabbitmq-delayed-messaging-da802e3a0aa9

संबंधित मुद्दे