Dot-Net

等待一條帶有超時的 RabbitMQ 消息

  • April 22, 2016

我想向 RabbitMQ 伺服器發送一條消息,然後等待回复消息(在“回复”隊列上)。當然,我不想永遠等待,以防處理這些消息的應用程序出現故障 - 需要超時。這聽起來像是一項非常基本的任務,但我找不到這樣做的方法。我現在在py-amqplibRabbitMQ .NET 客戶端都遇到了這個問題。

到目前為止我得到的最好的解決方案是使用中間進行輪詢basic_getsleep但這很醜陋:

def _wait_for_message_with_timeout(channel, queue_name, timeout):
   slept = 0
   sleep_interval = 0.1

   while slept < timeout:
       reply = channel.basic_get(queue_name)
       if reply is not None:
           return reply

       time.sleep(sleep_interval)
       slept += sleep_interval

   raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)

當然有更好的方法嗎?

amqplib我剛剛添加了對in 的超時支持carrot

這是一個子類amqplib.client0_8.Connection

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multichannel.wait能夠在任意數量的頻道上接收的版本。

我想這可能會在某個時候在上游合併。

這是我最終在 .NET 客戶端中所做的事情:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
   var consumer = new QueueingBasicConsumer(Channel);
   var tag = Channel.BasicConsume(queueName, true, null, consumer);
   try
   {
       object result;
       if (!consumer.Queue.Dequeue(timeoutMs, out result))
           throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));

       return ((BasicDeliverEventArgs)result).Body;
   }
   finally
   {
       Channel.BasicCancel(tag);
   }
}

不幸的是,我不能對 py-amqplib 做同樣的事情,因為它的basic_consume方法不會呼叫回調,除非你呼叫channel.wait()並且channel.wait()不支持超時!這個愚蠢的限制(我一直遇到)意味著如果您再也沒有收到過消息,您的執行緒將永遠凍結。

引用自:https://stackoverflow.com/questions/2799731