問題描述
我正在測試 Spring-AMQP
與 Spring-Integration
支持,我有以下配置和測試:
I am testing Spring-AMQP
with Spring-Integration
support, I've following configuration and test:
<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
<int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>
<int-amqp:inbound-channel-adapter
channel="consumingChannel"
queue-names="durableQ"
connection-factory="connectionFactory"
concurrent-consumers="1"
acknowledge-mode="AUTO"
/>
public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
"classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");
PollableChannel consumingChannel = context.getBean("consumingChannel",
PollableChannel.class);
int count = 0;
while (true) {
Message<?> msg = consumingChannel.receive(1000);
System.out.println((count++) + " -> " + msg);
try { //sleep to check number of messages in queue
Thread.sleep(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在此配置中,很明顯,一旦消息到達 sumptionChannel
,它們就會被確認并因此從隊列中刪除.我通過在 receive
之后放置一個高 sleep
并檢查 queue-size
來驗證這一點.沒有進一步的控制.
In this configuration it was evident that as soon as message arrives at consumingChannel
they are Acked and hence removed from queue. I validated this by placing a high sleep
after receive
and check queue-size
. There are no further control on it.
現在,如果我設置 acknowledge-mode=MANUAL
,似乎沒有辦法通過 spring 集成手動執行 ack
.
Now if I set acknowledge-mode=MANUAL
, there are no ways seems to do manual ack
via spring integration.
我需要處理消息,并在處理后執行 manual-ack
,以便 ack
消息保持在 durableQ
處.
My need is to process message and after processing do a manual-ack
so till ack
message remains persisted at durableQ
.
有沒有辦法用 spring-amqp-integration
處理 MANUAL
ack?我想避免將 ChannelAwareMessageListener
傳遞給 inbound-channel-adapter
,因為我想控制消費者的 receive
.
Is there any way to handle MANUAL
ack with spring-amqp-integration
? I want to avoid passing ChannelAwareMessageListener
to inbound-channel-adapter
since I want to have control of consumer's receive
.
更新:
當使用自己的 listener-container
和 inbound-channel-adapter
時,這似乎是不可能的:
It even doesn't seems to be possible when using own listener-container
with inbound-channel-adapter
:
// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" />
<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueNames" value="durableQ" />
<property name="acknowledgeMode" value="MANUAL" />
// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
<property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>
上述配置拋出錯誤,因為 messageListener
屬性是不允許的,請參閱標記的內聯注釋.所以使用 listner-container
的目的被打敗了(通過 ChannelAwareMessageListener
暴露 channel
).
Above configuration throws error as messageListener
property is not allowed, see inline comment on tag. So purpose of using listner-container
got defeated (for exposing channel
via ChannelAwareMessageListener
).
對我來說 spring-integration
不能用于 manual-acknowledgement
(我知道,這很難說!),任何人都可以幫我驗證這個或者是我缺少任何特定的方法/配置嗎?
To me spring-integration
cannot be used for manual-acknowledgement
(I know, this is a hard saying!), Can anyone help me in validating this or Is there any specific approach/configuration required for this which I am missing?
推薦答案
問題是因為您使用的是使用 QueueChannel
的異步切換.通常最好控制容器中的并發性(concurrent-consumers="2"
),并且不要在流程中進行任何異步切換(使用 DirectChannel
s).這樣,AUTO ack 就可以正常工作了.而不是從 PollableChannel
接收 new MessageHandler()
到 SubscribableChannel
.
The problem is because you are using async handoff using a QueueChannel
. It is generally better to control the concurrency in the container (concurrent-consumers="2"
) and don't do any async handoffs in your flow (use DirectChannel
s). That way, AUTO ack will work just fine. Instead of receiving from the PollableChannel
subscribe a new MessageHandler()
to a SubscribableChannel
.
更新:
您通常不需要在 SI 應用程序中處理消息,但您使用 DirectChannel 進行的測試相當于...
You normally don't need to deal with Messages in an SI application, but the equivalent of your test with a DirectChannel would be...
SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);
channel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("Got " + message);
}
});
這篇關于Spring AMQP 集成 - 消費者手冊 致謝的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!