久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

Spring異步MessageListener用例發生業務異常時如何讓

How to ask RabbitMQ to retry when business Exception occurs in Spring Asynchronous MessageListener use case(Spring異步MessageListener用例發生業務異常時如何讓RabbitMQ重試)
本文介紹了Spring異步MessageListener用例發生業務異常時如何讓RabbitMQ重試的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

問題描述

限時送ChatGPT賬號..

我正在運行一個 Spring AMQP 消息偵聽器.

I have a Spring AMQP message listener running.

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}

如您所見,在處理過程中可能會出現異常.由于 Catch 塊中的特定錯誤,我想重試.我無法通過 onMessage 中的異常.如何告訴 RabbitMQ 有異常并重試?

As you can see, there could be exception coming out during process. I want to retry because of a particular error in Catch block. I cannot through exception in onMessage. How to tell RabbitMQ to there is an exception and retry?

推薦答案

由于 onMessage() 不允許拋出已檢查的異常,您可以將異常包裝在 RuntimeException 然后重新扔.

Since onMessage() doesn't allow to throw checked exceptions you can wrap the exception in a RuntimeException and re-throw it.

try {
    testService.process(message);
} catch (BusinessException e) {
    throw new RuntimeException(e);
}

但是請注意,這可能會導致郵件無限期地重新發送.以下是它的工作原理:

Note however that this may result in the message to be re-delivered indefinitely. Here is how this works:

RabbitMQ 支持拒絕消息并要求代理重新排隊.這顯示在這里.但是 RabbitMQ 本身并沒有重試策略的機制,例如設置最大重試次數、延遲等.

RabbitMQ supports rejecting a message and asking the broker to requeue it. This is shown here. But RabbitMQ doesn't natively have a mechanism for retry policy, e.g. setting max retries, delay, etc.

使用 Spring AMQP 時,requeue on reject"是默認選項.Spring 的 SimpleMessageListenerContainer 默認情況下會在出現未處理的異常時執行此操作.所以在你的情況下,你只需要重新拋出捕獲的異常.但是請注意,如果您無法處理消息并且總是拋出異常,則該異常將無限期地重新傳遞并導致無限循環.

When using Spring AMQP, "requeue on reject" is the default option. Spring's SimpleMessageListenerContainer will by default do this when there is an unhandled exception. So in your case you just need to re-throw the caught exception. Note however that if you cannot process a message and you always throw the exception this will be re-delivered indefinitely and will result in an infinite loop.

您可以通過拋出 AmqpRejectAndDontRequeueException 異常,這種情況下消息不會被重新排隊.

You can override this behaviour per message by throwing a AmqpRejectAndDontRequeueException exception, in which case the message will not be requeued.

您也可以通過設置完全關閉 SimpleMessageListenerContainer 的requeue on reject"行為

You can also switch off the "requeue on reject" behavior of SimpleMessageListenerContainer entirely by setting

container.setDefaultRequeueRejected(false) 

當一條消息被拒絕并且沒有重新排隊時,如果在 RabbitMQ 中設置了一個,它將丟失或轉移到 DLQ.

When a message is rejected and not requeued it will either be lost or transferred to a DLQ, if one is set in RabbitMQ.

如果您需要具有最大嘗試、延遲等的重試策略,最簡單的方法是設置一個彈簧無狀態"RetryOperationsInterceptor,它將在線程內進行所有重試(使用 Thread.sleep()) 而不會在每次重試時拒絕消息(因此每次重試都不會返回 RabbitMQ).當重試用盡時,默認情況下將記錄一個警告并使用該消息.如果您想發送到 DLQ,您將需要 RepublishMessageRecoverer 或自定義 MessageRecoverer 拒絕消息無需重新排隊(在后一種情況下,您還應該在隊列中設置 RabbitMQ DLQ).使用默認消息恢復器的示例:

If you need a retry policy with max attempts, delay, etc the easiest is to setup a spring "stateless" RetryOperationsInterceptor which will do all retries within the thread (using Thread.sleep()) without rejecting the message on each retry (so without going back to RabbitMQ for each retry). When retries are exhausted, by default a warning will be logged and the message will be consumed. If you want to send to a DLQ you will need either a RepublishMessageRecoverer or a custom MessageRecoverer that rejects the message without requeuing (in that latter case you should also setup a RabbitMQ DLQ on the queue). Example with default message recoverer:

container.setAdviceChain(new Advice[] {
        org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                .stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2, 5000)
                .build()
});

這顯然有一個缺點,即您將在整個重試期間占用線程.您還可以選擇使用有狀態的"RetryOperationsInterceptor,它會在每次重試時將消息發送回 RabbitMQ,但延遲仍將通過 Thread.sleep() 實現code> 在應用程序中,加上設置有狀態攔截器有點復雜.

This obviously has the drawback that you will occupy the Thread for the entire duration of the retries. You also have the option to use a "stateful" RetryOperationsInterceptor, which will send the message back to RabbitMQ for each retry, but the delay will still be implemented with Thread.sleep() within the application, plus setting up a stateful interceptor is a bit more complicated.

因此,如果您想在不占用 Thread 的情況下延遲重試,您將需要一個在 RabbitMQ 隊列上使用 TTL 的更復雜的自定義解決方案.如果您不想要指數退避(因此每次重試時延遲不會增加),它會更簡單一些.要實現這樣的解決方案,您基本上在 rabbitMQ 上創建另一個帶有參數的隊列: "x-message-ttl": <delay time in milliseconds> and "x-dead-letter-exchange":"<原始隊列的名稱>".然后在主隊列上設置 "x-dead-letter-exchange":"".所以現在當你拒絕并且不重新排隊消息時,RabbitMQ 會將它重定向到第二個隊列.當 TTL 過期時,它將被重定向到原始隊列,從而重新傳遞給應用程序.所以現在你需要一個重試攔截器,它在每次失敗后拒絕發送給 RabbitMQ 的消息,并跟蹤重試計數.為了避免需要在應用程序中保持狀態(因為如果您的應用程序是集群的,則需要復制狀態)您可以從 RabbitMQ 設置的 x-death 標頭計算重試次數.在此處查看有關此標頭的更多信息.因此,此時實現自定義攔截器比使用這種行為自定義 Spring 有狀態攔截器更容易.

Therefore, if you want retries with delays without occupying a Thread you will need a much more involved custom solution using TTL on RabbitMQ queues. If you don't want exponential backoff (so delay doesn't increase on each retry) it's a bit simpler. To implement such a solution you basically create another queue on rabbitMQ with arguments: "x-message-ttl": <delay time in milliseconds> and "x-dead-letter-exchange":"<name of the original queue>". Then on the main queue you set "x-dead-letter-exchange":"<name of the queue with the TTL>". So now when you reject and don't requeue a message RabbitMQ will redirect it to the second queue. When TTL expires it will be redirected to the original queue and thus redelivered to the application. So now you need a retry interceptor that rejects the message to RabbitMQ after each failure and also keeps track of the retry count. To avoid the need to keep state in the application (because if your application is clustered you need to replicate state) you can calculate the retry count from the x-death header that RabbitMQ sets. See more info about this header here. So at that point implementing a custom interceptor is easier than customising the Spring stateful interceptor with this behaviour.

還要檢查 Spring AMQP 參考中關于重試的部分.

這篇關于Spring異步MessageListener用例發生業務異常時如何讓RabbitMQ重試的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當前星期幾的值)
主站蜘蛛池模板: 91国产视频在线观看 | 成人深夜福利 | 久久久久久久久久久蜜桃 | 日韩区| 日韩精品在线观看免费 | 午夜寂寞网站 | 国产一二三区在线 | 成人黄色a | 一区二区三区不卡视频 | 一区二区三区成人 | 91色网站| 欧美成人手机视频 | 中文字幕在线观看视频网站 | 午夜寂寞影院在线观看 | 欧美成人a∨高清免费观看 91伊人 | 国产精品99| 精品一二区 | 亚洲视频免费观看 | 91国在线 | 久久99精品视频 | 中文字幕人成乱码在线观看 | 国内av在线| 国产精品久久久久久一区二区三区 | 亚洲精品二三区 | 一区二区三区在线观看视频 | www.久久久久久久久久久 | 在线看91 | 在线观看国产视频 | 精品日韩在线观看 | 国产精品一卡二卡三卡 | 日日操日日干 | 亚洲国产一区二区三区 | 日韩一区二区三区在线视频 | 国产人成精品一区二区三 | 欧美日韩不卡在线 | 九九精品在线 | 日韩精品视频在线 | 国产精品不卡一区二区三区 | 国产精品一区二区在线免费观看 | 中文字幕 在线观看 | 久久久国产网站 |