問題描述
我正在嘗試使用 spring amqp 使用rabbitmq,下面是我的配置.
I am trying to use rabbitmq using spring amqp, below is my configuration.
<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" />
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<rabbit:queue name="${rabbitmq.import.queue}" />
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" />
<beans:bean id="importExchangeMessageListener"
class="com.stockopedia.batch.foundation.ImportMessageListener" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="5">
<rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" />
</rabbit:listener-container>
這是一個簡單的消息監聽類,
This is simple Message Listener class,
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class ImportMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
}
}
這是producer(即spring batch的itemWriter),
This is producer (which is itemWriter of spring batch),
public class ImportItemWriter<T> implements ItemWriter<T> {
private AmqpTemplate template;
public AmqpTemplate getTemplate() {
return template;
}
public void setTemplate(AmqpTemplate template) {
this.template = template;
}
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Object reply = template.convertSendAndReceive(item.toString());
System.out.println("producer output: " + reply);
}
}
}
當我運行我的 spring 批處理作業時, ImportItemWriter.write 被調用.但是 ImportMessageListener.onMessage 不起作用.它不打印消息.我得到控制臺上所有項目的低于輸出
When I run my spring batch job, ImportItemWriter.write gets called. But ImportMessageListener.onMessage does not work. It doesnt print the message. I get below output for all items on console
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
推薦答案
您的消費者沒有發送結果...
Your consumer is not sending a result...
@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
}
改成簡單的POJO;容器的 MessageListenerAdapter
將為您處理轉換,并發送結果.
Change it to a simple POJO; the container's MessageListenerAdapter
will take care of the conversion for you, and send the result.
@Override
public String handleMessage(String message) {
System.out.println("consumer output: " + message);
return "result";
}
您還沒有設置任何交換或路由到您的隊列.如果您想使用默認交換/路由,請使用...
You also haven't set up any exchange or routing to your queue. If you want to use default exchange/routing, use...
convertSendAndReceive("", queueName, item.toString());
或者,將模板上的 routingKey
設置為隊列名稱,然后可以使用更簡單的方法.
Or, set the routingKey
on the template to the queue name and then you can use the simpler method.
...sendAndReceive()
方法適用于請求/回復場景,因此需要阻塞.要異步執行此操作,您必須使用 ...send()
方法之一,并連接您自己的 SimpleListenerContainer
以接收回復;你將不得不做你自己的相關性.使用
The ...sendAndReceive()
methods are meant for request/reply scenarios so blocking is required. To do it asynchronously, you have to use one of the ...send()
methods, and wire up your own SimpleListenerContainer
to receive the replies; you will have to do your own correlation. Use
public void convertAndSend(Object message, MessagePostProcessor postProcessor)
在您的消息后處理器中,設置 replyTo
和 correlationId
標頭...
and in your message post processor, set the replyTo
and correlationId
headers...
message.getMessageProperties().setReplyTo("foo");
message.getMessageProperties().setCorrelationId("bar");
或者,自己構建 Message
對象(例如 通過使用 MessageBuilder
) 并使用 send
方法...
Or, build the Message
object yourself (e.g by using the MessageBuilder
) and use the send
method...
template.send(MessageBuilder.withBody("foo".getBytes())
.setReplyTo("bar")
.setCorrelationId("baz".getBytes())
.build());
每個請求都需要一個唯一的 correlationId
,以便您可以關聯響應.
Each request needs a unique correlationId
so you can correlate the response.
這篇關于spring amqp rabbitmq MessageListener 不工作的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!