के लिए देरी कतार का कार्यान्वयन, मैंने निर्माता/उपभोक्ता कतार प्रणाली पर त्वरित कार्यान्वयन किया।हाल ही में PHP AMQP
<?php
namespace Queue;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Amqp
{
private $connection;
private $queueName;
private $delayedQueueName;
private $channel;
private $callback;
public function __construct($host, $port, $login, $password, $queueName)
{
$this->connection = new AMQPStreamConnection($host, $port, $login, $password);
$this->queueName = $queueName;
$this->delayedQueueName = null;
$this->channel = $this->connection->channel();
// First, we need to make sure that RabbitMQ will never lose our queue.
// In order to do so, we need to declare it as durable. To do so we pass
// the third parameter to queue_declare as true.
$this->channel->queue_declare($queueName, false, true, false, false);
}
public function __destruct()
{
$this->close();
}
// Just in case : http://stackoverflow.com/questions/151660/can-i-trust-php-destruct-method-to-be-called
// We should call close explicitly if possible.
public function close()
{
if (!is_null($this->channel)) {
$this->channel->close();
$this->channel = null;
}
if (!is_null($this->connection)) {
$this->connection->close();
$this->connection = null;
}
}
public function produceWithDelay($data, $delay)
{
if (is_null($this->delayedQueueName))
{
$delayedQueueName = $this->queueName . '.delayed';
// First, we need to make sure that RabbitMQ will never lose our queue.
// In order to do so, we need to declare it as durable. To do so we pass
// the third parameter to queue_declare as true.
$this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
new AMQPTable(array(
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $this->queueName
))
);
$this->delayedQueueName = $delayedQueueName;
}
$msg = new AMQPMessage(
$data,
array(
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => $delay
)
);
$this->channel->basic_publish($msg, '', $this->delayedQueueName);
}
public function produce($data)
{
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$this->channel->basic_publish($msg, '', $this->queueName);
}
public function consume($callback)
{
$this->callback = $callback;
// This tells RabbitMQ not to give more than one message to a worker at
// a time.
$this->channel->basic_qos(null, 1, null);
// Requires ack.
$this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback'));
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function consumeCallback($msg)
{
call_user_func_array(
$this->callback,
array($msg)
);
// Very important to ack, in order to remove msg from queue. Ack after
// callback, as exception might happen in callback.
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
public function getQueueSize()
{
// three tuple containing (<queue name>, <message count>, <consumer count>)
$tuple = $this->channel->queue_declare($this->queueName, false, true, false, false);
if ($tuple != null && isset($tuple[1])) {
return $tuple[1];
}
return -1;
}
}
public function produce
और public function consume
जोड़ी के रूप में उम्मीद काम करता है।
हालांकि, जब यह देरी कतार सिस्टम के साथ आता
public function produceWithDelay
और public function consume
जोड़ी अपेक्षा के अनुरूप काम नहीं करता है। उपभोक्ता जो consume
पर कॉल करता है, किसी भी आइटम को प्राप्त करने में सक्षम नहीं है, यहां तक कि कुछ समय के लिए भी इंतजार कर रहा है।
मुझे विश्वास है कि मेरे produceWithDelay
कार्यान्वयन के साथ कुछ सही नहीं है। क्या मुझे पता है कि क्या गलत है?
कोशिश के रूप में' $ चैनल> queue_declare ("नाम", झूठी, झूठे, झूठे, सच, सच, सरणी()) अपने कतार घोषित करने के लिए ] (https://gist.github.com/tairov/11289983) – Vardius
इसे स्क्रैच से लागू करने की कोई आवश्यकता नहीं है। इस तरह आपको इसे करना चाहिए https://stackoverflow.com/a/45549182/579025 –