問題描述
我從服務 (S) 接收消息,該服務將每個單獨的屬性更改作為單獨的消息發布到實體.一個人為的例子是這樣的實體:
I'm receiving messages from a service (S) that publishes each individual property change to an entity as a separate message. A contrived example would be an entity like this:
Person {
id: 123
name: "Something",
address: {...}
}
如果姓名和地址在同一事務中更新,則 (S) 將發布兩條消息,PersonNameCorrected
和 PersonMoved
.問題出在接收端,我在其中存儲此 Person
實體的投影,并且每個屬性更改都會導致寫入數據庫.所以在這個例子中,會有兩次對數據庫的寫入,但是如果我可以在短時間內批量處理消息并按 id 分組,那么我只需要對數據庫進行一次寫入.
If name and address are updated in the same transaction then (S) will publish two messages, PersonNameCorrected
and PersonMoved
. The problem is on the receiving side where I'm storing a projection of this Person
entity and each property change causes a write to the database. So in this example there would be two writes to the database but if I could batch messages for a short period of time and group them by id then I would only have to make a single write to the database.
在 RabbitMQ 中通常如何處理這個問題?Spring AMQP 是否提供了更簡單的抽象?
How does one typically handle this in RabbitMQ? Does Spring AMQP provide an easier abstraction?
請注意,我已經簡要查看了 prefetch 但我不確定這是否是要走的路.如果我理解正確,預取也是基于每個連接的.我試圖在 per-queue 的基礎上實現這一點,因為如果批處理(從而增加延遲)是可行的方法,我不想將此延遲添加到我使用的所有隊列中服務(但僅限于那些需要group-by-id"功能的人).
Note that I have looked briefly at prefetch but I'm not sure if this is the way to go. Also prefetch, if I understand it correctly, is per connection basis. I'm trying to achieve this on a per-queue basis, because if batching (and thus added latency) is the way to go I wouldn't like to add this latency to ALL queues consumed by my service (but only to those that need the "group-by-id" features).
推薦答案
對于這種情況,Prefetch 無濟于事.
Prefetch won't help for a case like this.
考慮使用 Spring Integration,它的適配器位于 Spring 之上AMQP;它還提供了一個聚合器,可用于在將消息發送到管道中的下一個階段之前將它們組合在一起.
Consider using Spring Integration which has adapters that sit on top of Spring AMQP; it also provides an aggregrator which can be used to group messages together before sending them on to the next stage in the pipeline.
編輯
這是一個用于演示的快速啟動應用...
Here's a quick boot app to demostrate...
@SpringBootApplication
public class So42969130Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So42969130Application.class, args)
.close();
}
@Autowired
private RabbitTemplate template;
@Autowired
private Handler handler;
@Override
public void run(String... args) throws Exception {
this.template.convertAndSend("so9130", new PersonNameChanged(123));
this.template.convertAndSend("so9130", new PersonMoved(123));
this.handler.latch.await(10, TimeUnit.SECONDS);
}
@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
.messageConverter(converter()))
.aggregate(a -> a
.correlationExpression("payload.id")
.releaseExpression("false") // open-ended release, timeout only
.sendPartialResultOnExpiry(true)
.groupTimeout(2000))
.handle(handler())
.get();
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Handler handler() {
return new Handler();
}
@Bean
public Queue queue() {
return new Queue("so9130", false, false, true);
}
public static class Handler {
private final CountDownLatch latch = new CountDownLatch(1);
@ServiceActivator
public void handle(Collection<?> aggregatedData) {
System.out.println(aggregatedData);
this.latch.countDown();
}
}
public static class PersonNameChanged {
private int id;
PersonNameChanged() {
}
PersonNameChanged(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonNameChanged [id=" + this.id + "]";
}
}
public static class PersonMoved {
private int id;
PersonMoved() {
}
PersonMoved(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonMoved [id=" + this.id + "]";
}
}
}
波姆:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>so42969130</artifactId>
<version>2.0.0-BUILD-SNAPSHOT</version>
<packaging>jar</packaging>
<name>so42969130</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
結果:
2017-03-23 09:56:57.501 INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler :
Expiring MessageGroup with correlationKey[123]
[PersonNameChanged [id=123], PersonMoved [id=123]]
這篇關于在 RabbitMQ 中分組接收消息,最好使用 Spring AMQP?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!