問題描述
我有一個應用程序,它使用 RabbitMQ 作為消息隊列在兩個組件之間發送/接收消息:發送者和接收者.發件人以非常快的方式發送消息.接收方接收到消息,然后執行一些非常耗時的任務(主要是為非常大的數據量寫入數據庫).由于接收者需要很長時間才能完成任務然后檢索隊列中的下一條消息,因此發送者將繼續快速填滿隊列.所以我的問題是:這會導致消息隊列溢出嗎?
消息消費者如下所示:
public void onMessage() throws IOException, InterruptedException {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");QueueingConsumer 消費者 = 新 QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer);而(真){QueueingConsumer.Delivery 交付 = consumer.nextDelivery();字符串消息 = new String(delivery.getBody());System.out.println(" [x] 收到'" + 消息 + "'");JSONObject json = new JSONObject(message);字符串 caseID = json.getString("caseID");//跟隨需要很長時間dao.saveToDB(caseID);}}
消費者收到的每條消息都包含一個 caseID.對于每個caseID,都會將大量的數據保存到數據庫中,這需要很長時間.目前只為 RabbitMQ 設置了一個消費者,因為生產者/消費者使用相同的隊列來發布/訂閱 caseID.那么如何加快消費者的吞吐量,讓消費者趕上生產者,避免隊列中的消息溢出呢?是否應該在消費者部分使用多線程來加快消費速度?或者我應該使用多個消費者同時消費傳入的消息?或者有什么異步方式讓消費者異步消費消息而不等待它完成?歡迎任何建議.
這會導致消息隊列溢出嗎?"
是的.RabbitMQ 會進入流控"狀態,以防止隨著隊列長度的增加而過度消耗內存.它還將開始將消息持久化到磁盤,而不是將它們保存在內存中.
<塊引用>"那么如何才能加快消費者的吞吐量,讓消費者可以趕上生產者,避免消息溢出排隊"
你有兩個選擇:
- 添加更多消費者.請記住,如果您選擇此選項,您的數據庫現在將由多個并發進程操作.確保數據庫能夠承受額外的壓力.
- 提高消費渠道的QOS值.這將從隊列中提取更多消息并將它們緩沖在消費者上.這將增加整體處理時間;如果緩沖了 5 條消息,則第 5 條消息將花費消息 1...5 的處理時間來完成.
<塊引用>
我應該在消費者部分使用多線程來加速消費率?"
除非您有精心設計的解決方案,否則不會.向應用程序添加并行性將在消費者端增加大量開銷.您最終可能會耗盡 ThreadPool 或限制內存使用.
在處理 AMQP 時,您確實需要考慮每個流程的業務需求,以便設計出最優的解決方案.您收到的消息對時間有多敏感?它們是否需要盡快持久化到數據庫中,或者這些數據是否立即可用對您的用戶來說很重要?
如果數據不需要立即持久化,您可以修改您的應用程序,以便消費者只需從隊列中刪除消息并將它們保存到緩存集合中,例如在 Redis 中.引入第二個進程,然后依次讀取和處理緩存的消息.這將確保您的隊列長度不會增長到足以導致流控制,同時防止您的數據庫被寫入請求轟炸,這些寫入請求通常比讀取請求更昂貴.您的消費者現在只需從隊列中刪除消息,稍后由另一個進程處理.
I have an application that uses RabbitMQ as the message queue to send/receive message between two components: sender and receiver. The sender sends message in a very fast way. The receiver receives the message and then does some very time-consuming task (mainly database writing for very large data size). Since the receiver takes a very long time to finish the task and then retrieve the next message in the queue, the sender will keep filling up the queue quickly. So my question is: Will this cause the message queue to overflow?
The message consumer looks like the following:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
Each message received by the consumer contains a caseID. For each caseID, it will save large amount of data to the database, which takes very long time. Currently only one consumer is set up for the RabbitMQ since producer/consumer use the same queue for the publish/subscribe of caseID. So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue? Should I use multithreading in the consumer part to speed up the consumption rate? Or should I use multiple consumers to consume the incoming message simutaneously? Or is there any asynchronous way to let the consumer consume the message asynchronously without waiting it to finish? Any suggestions are welcome.
"Will this cause the message queue to overflow?"
Yes. RabbitMQ will enter a state of "flow control" to prevent excessive memory consumption as the queue length increases. It will also start persisting messages to disk, rather than hold them in memory.
"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue"
You have 2 options:
- Add more consumers. Bear in mind that your DB will now be manipulated by multiple concurrent processes if you choose this option. Ensure that the DB can withstand the extra pressure.
- Increase the QOS value of the consuming channel. This will pull more messages from the queue and buffer them on the consumer. This will increase the overall processing time; if 5 messages are buffered, the 5th message will take the processing time of messages 1...5 to complete.
"Should I use multithreading in the consumer part to speed up the consumption rate?"
Not unless you have a well-designed solution. Adding parallelism to an application is going to add a lot of overhead on the consumer-side. You may end up exhausting the ThreadPool or throttling memory-usage.
When dealing with AMQP, you really need to consider the business requirement for each process in order to design the optimal solution. How time-sensitive are your incoming messages? Do they need to be persisted to DB ASAP, or does it matter to your users whether or not that data is available immediately?
If the data does not need to be persisted immediately, you could modify your application so that the consumer(s) simply remove messages from the queue and save them to a cached collection, in Redis, for example. Introduce a second process which then reads and processes the cached messages sequentially. This will ensure that your queue-length does not grow sufficiently to result in flow-control, while preventing your DB from being bombarded with write requests, which are typically more expensive than read requests. Your consumer(s) now simply remove messages from the queue, to be dealt with by another process later.
這篇關于RabbitMQ:快速生產者和慢消費者的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!