Dot-Net
等待一條帶有超時的 RabbitMQ 消息
我想向 RabbitMQ 伺服器發送一條消息,然後等待回复消息(在“回复”隊列上)。當然,我不想永遠等待,以防處理這些消息的應用程序出現故障 - 需要超時。這聽起來像是一項非常基本的任務,但我找不到這樣做的方法。我現在在py-amqplib和RabbitMQ .NET 客戶端都遇到了這個問題。
到目前為止我得到的最好的解決方案是使用中間進行輪詢
basic_get,sleep但這很醜陋:def _wait_for_message_with_timeout(channel, queue_name, timeout): slept = 0 sleep_interval = 0.1 while slept < timeout: reply = channel.basic_get(queue_name) if reply is not None: return reply time.sleep(sleep_interval) slept += sleep_interval raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)當然有更好的方法嗎?
amqplib我剛剛添加了對in 的超時支持carrot。這是一個子類
amqplib.client0_8.Connection:http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97
wait_multi是channel.wait能夠在任意數量的頻道上接收的版本。我想這可能會在某個時候在上游合併。
這是我最終在 .NET 客戶端中所做的事情:
protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs) { var consumer = new QueueingBasicConsumer(Channel); var tag = Channel.BasicConsume(queueName, true, null, consumer); try { object result; if (!consumer.Queue.Dequeue(timeoutMs, out result)) throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0)); return ((BasicDeliverEventArgs)result).Body; } finally { Channel.BasicCancel(tag); } }不幸的是,我不能對 py-amqplib 做同樣的事情,因為它的
basic_consume方法不會呼叫回調,除非你呼叫channel.wait()並且channel.wait()不支持超時!這個愚蠢的限制(我一直遇到)意味著如果您再也沒有收到過消息,您的執行緒將永遠凍結。