久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

RabbitMQ:快速生產者和慢消費者

RabbitMQ: fast producer and slow consumer(RabbitMQ:快速生產者和慢消費者)
本文介紹了RabbitMQ:快速生產者和慢消費者的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

問題描述

限時送ChatGPT賬號..

我有一個應用程序,它使用 RabbitMQ 作為消息隊列在兩個組件之間發送/接收消息:發送者和接收者.發件人以非常快的方式發送消息.接收方接收到消息,然后執行一些非常耗時的任務(主要是為非常大的數據量寫入數據庫).由于接收者需要很長時間才能完成任務然后檢索隊列中的下一條消息,因此發送者將繼續快速填滿隊列.所以我的問題是:這會導致消息隊列溢出嗎?

消息消費者如下所示:

public void onMessage() throws IOException, InterruptedException {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");QueueingConsumer 消費者 = 新 QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer);而(真){QueueingConsumer.Delivery 交付 = consumer.nextDelivery();字符串消息 = new String(delivery.getBody());System.out.println(" [x] 收到'" + 消息 + "'");JSONObject json = new JSONObject(message);字符串 caseID = json.getString("caseID");//跟隨需要很長時間dao.saveToDB(caseID);}}

消費者收到的每條消息都包含一個 caseID.對于每個caseID,都會將大量的數據保存到數據庫中,這需要很長時間.目前只為 RabbitMQ 設置了一個消費者,因為生產者/消費者使用相同的隊列來發布/訂閱 caseID.那么如何加快消費者的吞吐量,讓消費者趕上生產者,避免隊列中的消息溢出呢?是否應該在消費者部分使用多線程來加快消費速度?或者我應該使用多個消費者同時消費傳入的消息?或者有什么異步方式讓消費者異步消費消息而不等待它完成?歡迎任何建議.

解決方案

這會導致消息隊列溢出嗎?"

是的.RabbitMQ 會進入流控"狀態,以防止隨著隊列長度的增加而過度消耗內存.它還將開始將消息持久化到磁盤,而不是將它們保存在內存中.

<塊引用>

"那么如何才能加快消費者的吞吐量,讓消費者可以趕上生產者,避免消息溢出排隊"

你有兩個選擇:

  1. 添加更多消費者.請記住,如果您選擇此選項,您的數據庫現在將由多個并發進程操作.確保數據庫能夠承受額外的壓力.
  2. 提高消費渠道的QOS值.這將從隊列中提取更多消息并將它們緩沖在消費者上.這將增加整體處理時間;如果緩沖了 5 條消息,則第 5 條消息將花費消息 1...5 的處理時間來完成.

<塊引用>

我應該在消費者部分使用多線程來加速消費率?"

除非您有精心設計的解決方案,否則不會.向應用程序添加并行性將在消費者端增加大量開銷.您最終可能會耗盡 ThreadPool 或限制內存使用.

在處理 AMQP 時,您確實需要考慮每個流程的業務需求,以便設計出最優的解決方案.您收到的消息對時間有多敏感?它們是否需要盡快持久化到數據庫中,或者這些數據是否立即可用對您的用戶來說很重要?

如果數據不需要立即持久化,您可以修改您的應用程序,以便消費者只需從隊列中刪除消息并將它們保存到緩存集合中,例如在 Redis 中.引入第二個進程,然后依次讀取和處理緩存的消息.這將確保您的隊列長度不會增長到足以導致流控制,同時防止您的數據庫被寫入請求轟炸,這些寫入請求通常比讀取請求更昂貴.您的消費者現在只需從隊列中刪除消息,稍后由另一個進程處理.

I have an application that uses RabbitMQ as the message queue to send/receive message between two components: sender and receiver. The sender sends message in a very fast way. The receiver receives the message and then does some very time-consuming task (mainly database writing for very large data size). Since the receiver takes a very long time to finish the task and then retrieve the next message in the queue, the sender will keep filling up the queue quickly. So my question is: Will this cause the message queue to overflow?

The message consumer looks like the following:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}

Each message received by the consumer contains a caseID. For each caseID, it will save large amount of data to the database, which takes very long time. Currently only one consumer is set up for the RabbitMQ since producer/consumer use the same queue for the publish/subscribe of caseID. So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue? Should I use multithreading in the consumer part to speed up the consumption rate? Or should I use multiple consumers to consume the incoming message simutaneously? Or is there any asynchronous way to let the consumer consume the message asynchronously without waiting it to finish? Any suggestions are welcome.

解決方案

"Will this cause the message queue to overflow?"

Yes. RabbitMQ will enter a state of "flow control" to prevent excessive memory consumption as the queue length increases. It will also start persisting messages to disk, rather than hold them in memory.

"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue"

You have 2 options:

  1. Add more consumers. Bear in mind that your DB will now be manipulated by multiple concurrent processes if you choose this option. Ensure that the DB can withstand the extra pressure.
  2. Increase the QOS value of the consuming channel. This will pull more messages from the queue and buffer them on the consumer. This will increase the overall processing time; if 5 messages are buffered, the 5th message will take the processing time of messages 1...5 to complete.

"Should I use multithreading in the consumer part to speed up the consumption rate?"

Not unless you have a well-designed solution. Adding parallelism to an application is going to add a lot of overhead on the consumer-side. You may end up exhausting the ThreadPool or throttling memory-usage.

When dealing with AMQP, you really need to consider the business requirement for each process in order to design the optimal solution. How time-sensitive are your incoming messages? Do they need to be persisted to DB ASAP, or does it matter to your users whether or not that data is available immediately?

If the data does not need to be persisted immediately, you could modify your application so that the consumer(s) simply remove messages from the queue and save them to a cached collection, in Redis, for example. Introduce a second process which then reads and processes the cached messages sequentially. This will ensure that your queue-length does not grow sufficiently to result in flow-control, while preventing your DB from being bombarded with write requests, which are typically more expensive than read requests. Your consumer(s) now simply remove messages from the queue, to be dealt with by another process later.

這篇關于RabbitMQ:快速生產者和慢消費者的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當前星期幾的值)
主站蜘蛛池模板: 亚洲一区二区三区观看 | 性网址| 国产精品久久国产精品 | 日韩视频观看 | 草草视频在线免费观看 | 91精品国产综合久久久动漫日韩 | 美女福利视频一区 | 国产91精品久久久久久久网曝门 | 资源首页二三区 | 91在线影院 | 91视频国产区 | 理论片午午伦夜理片影院 | 日本不卡一区 | 久久精品av | 五月综合久久 | 欧美日韩在线一区二区三区 | 亚洲精品国产第一综合99久久 | 色婷婷综合久久久久中文一区二区 | 久久久久国产精品午夜一区 | 日本福利一区 | 男女羞羞视频大全 | av片免费 | 精品三级在线观看 | 黄色三级毛片 | 亚洲精品一区二区三区丝袜 | www.国产精 | 午夜精品在线观看 | 一区二区日韩精品 | 久久精品亚洲精品国产欧美kt∨ | 欧美成人h版在线观看 | 在线看日韩 | 91精品国模一区二区三区 | 毛片免费观看 | 精品久久国产老人久久综合 | 九九免费观看视频 | 亚洲精品免费观看 | 久久中文字幕av | 91日韩 | 国产小视频在线 | 影音先锋欧美资源 | 91伊人 |