久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

在 RabbitMQ 中分組接收消息,最好使用 Spring AMQP

Group received messages in RabbitMQ, preferably using Spring AMQP?(在 RabbitMQ 中分組接收消息,最好使用 Spring AMQP?)
本文介紹了在 RabbitMQ 中分組接收消息,最好使用 Spring AMQP?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

問題描述

限時送ChatGPT賬號..

我從服務 (S) 接收消息,該服務將每個單獨的屬性更改作為單獨的消息發布到實體.一個人為的例子是這樣的實體:

I'm receiving messages from a service (S) that publishes each individual property change to an entity as a separate message. A contrived example would be an entity like this:

Person {
    id: 123
    name: "Something",
    address: {...}
}

如果姓名和地址在同一事務中更新,則 (S) 將發布兩條消息,PersonNameCorrectedPersonMoved.問題出在接收端,我在其中存儲此 Person 實體的投影,并且每個屬性更改都會導致寫入數據庫.所以在這個例子中,會有兩次對數據庫的寫入,但是如果我可以在短時間內批量處理消息并按 id 分組,那么我只需要對數據庫進行一次寫入.

If name and address are updated in the same transaction then (S) will publish two messages, PersonNameCorrected and PersonMoved. The problem is on the receiving side where I'm storing a projection of this Person entity and each property change causes a write to the database. So in this example there would be two writes to the database but if I could batch messages for a short period of time and group them by id then I would only have to make a single write to the database.

在 RabbitMQ 中通常如何處理這個問題?Spring AMQP 是否提供了更簡單的抽象?

How does one typically handle this in RabbitMQ? Does Spring AMQP provide an easier abstraction?

請注意,我已經簡要查看了 prefetch 但我不確定這是否是要走的路.如果我理解正確,預取也是基于每個連接的.我試圖在 per-queue 的基礎上實現這一點,因為如果批處理(從而增加延遲)是可行的方法,我不想將此延遲添加到我使用的所有隊列中服務(但僅限于那些需要group-by-id"功能的人).

Note that I have looked briefly at prefetch but I'm not sure if this is the way to go. Also prefetch, if I understand it correctly, is per connection basis. I'm trying to achieve this on a per-queue basis, because if batching (and thus added latency) is the way to go I wouldn't like to add this latency to ALL queues consumed by my service (but only to those that need the "group-by-id" features).

推薦答案

對于這種情況,Prefetch 無濟于事.

Prefetch won't help for a case like this.

考慮使用 Spring Integration,它的適配器位于 Spring 之上AMQP;它還提供了一個聚合器,可用于在將消息發送到管道中的下一個階段之前將它們組合在一起.

Consider using Spring Integration which has adapters that sit on top of Spring AMQP; it also provides an aggregrator which can be used to group messages together before sending them on to the next stage in the pipeline.

編輯

這是一個用于演示的快速啟動應用...

Here's a quick boot app to demostrate...

@SpringBootApplication
public class So42969130Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So42969130Application.class, args)
            .close();
    }

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private Handler handler;

    @Override
    public void run(String... args) throws Exception {
        this.template.convertAndSend("so9130", new PersonNameChanged(123));
        this.template.convertAndSend("so9130", new PersonMoved(123));
        this.handler.latch.await(10, TimeUnit.SECONDS);
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
                        .messageConverter(converter()))
                .aggregate(a -> a
                        .correlationExpression("payload.id")
                        .releaseExpression("false") // open-ended release, timeout only
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(2000))
                .handle(handler())
                .get();
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Handler handler() {
        return new Handler();
    }

    @Bean
    public Queue queue() {
        return new Queue("so9130", false, false, true);
    }

    public static class Handler {

        private final CountDownLatch latch = new CountDownLatch(1);

        @ServiceActivator
        public void handle(Collection<?> aggregatedData) {
            System.out.println(aggregatedData);
            this.latch.countDown();
        }

    }

    public static class PersonNameChanged {

        private int id;

        PersonNameChanged() {
        }

        PersonNameChanged(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonNameChanged [id=" + this.id + "]";
        }

    }

    public static class PersonMoved {

        private int id;

        PersonMoved() {
        }

        PersonMoved(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonMoved [id=" + this.id + "]";
        }

    }

}

波姆:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>so42969130</artifactId>
    <version>2.0.0-BUILD-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>so42969130</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

結果:

2017-03-23 09:56:57.501  INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler : 
    Expiring MessageGroup with correlationKey[123]
[PersonNameChanged [id=123], PersonMoved [id=123]]

這篇關于在 RabbitMQ 中分組接收消息,最好使用 Spring AMQP?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當前星期幾的值)
主站蜘蛛池模板: 久久蜜桃av一区二区天堂 | 久久精品国产99国产精品 | 91视频大全| 免费观看的av毛片的网站 | 国产羞羞视频在线观看 | 久久91精品国产一区二区三区 | 狠狠狠干 | 成人a免费| 亚洲国产精品久久久久久 | 亚洲欧美日韩精品久久亚洲区 | 天天人人精品 | 成人亚洲片 | 国产一级在线观看 | 97人人超碰 | 精品国产99 | 日韩一区二区三区四区五区六区 | 午夜精品久久久久久久久久久久 | 午夜噜噜噜 | 欧美福利精品 | 欧美日韩精品久久久免费观看 | 亚洲精品一区二区三区中文字幕 | 欧美男人天堂 | 久久国内精品 | 亚洲国产成人精品久久久国产成人一区 | 久久精品国产a三级三级三级 | 亚洲欧美在线一区 | 久久人体视频 | 久久久综合网 | 99久久精品免费看国产四区 | 国产三级日本三级 | 成人片免费看 | 91精品国产高清久久久久久久久 | 日韩一区在线观看视频 | 一级毛片在线视频 | 久久综合九九 | 国产精品一区二区三区免费观看 | 天天曰天天干 | 91在线成人 | 中文无吗 | 亚洲高清在线观看 | 成人h电影在线观看 |