問題描述
我是 Spring 集成的新手,對如何將錯誤消息發(fā)送到指定的錯誤隊列感到困惑.我希望錯誤消息成為原始消息的標(biāo)題并最終出現(xiàn)在單獨的隊列中.我讀到這可以通過標(biāo)題豐富器來完成,我嘗試實現(xiàn)它,但錯誤隊列中沒有顯示任何內(nèi)容.
I'm new to spring integration and am confused about how to send error messages to a designated error queue. I want the error message to be a header on the original message and end up in a separate queue. I read that this can be done with a header enricher, which I tried to implement but nothing is showing up in the error queue.
另外,我是否需要一個單獨的異常處理類才能將錯誤消息放入錯誤隊列,或者我可以在我的轉(zhuǎn)換方法中拋出異常嗎?
Also, do I need a separate exception handling class in order for the error messages to make it to the error queue or can I just throw exceptions in my transforming methods?
這是我的 xml 配置:
Here is my xml config:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="first" auto-delete="false" durable="true" />
<rabbit:queue name="second" auto-delete="false" durable="true" />
<rabbit:queue name="errorQueue" auto-delete="false" durable="true" />
<int:poller default="true" fixed-rate="100"/>
<rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="second" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:fanout-exchange name="error-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="errorQueue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<int-amqp:outbound-channel-adapter channel="messageOutputChannel" exchange-name="second-exchange" amqp-template="amqpTemplate" />
<int-amqp:inbound-channel-adapter channel="messageInputChannel" error-channel="errorInputChannel" queue-names="first" connection-factory="connectionFactory" concurrent-consumers="20" />
<int-amqp:outbound-channel-adapter channel="errorOutputChannel" exchange-name="error-exchange" amqp-template="amqpTemplate" />
<int:channel id="messageInputChannel" />
<int:channel id="messageOutputChannel"/>
<int:channel id="errorInputChannel"/>
<int:service-activator input-channel="errorInputChannel" output-channel= "errorOutputChannel" method = "handleError" >
<bean class="firstAttempt.MessageErrorHandler"/>
<int:chain input-channel="messageInputChannel" output-channel="messageOutputChannel">
<int:header-enricher>
<int:error-channel ref="errorInputChannel" />
</int:header-enricher>
<int:transformer method = "convert" >
<bean class="firstAttempt.JsonObjectConverter" />
</int:transformer>
<int:service-activator method="transform">
<bean class="firstAttempt.Transformer" />
</int:service-activator>
<int:object-to-string-transformer />
</int:chain>
</beans>
錯誤類別:
public class ErrorHandler {
public String errorHandle(MessageHandlingException exception) {
return exception.getMessage();
QualityScorer 類(由轉(zhuǎn)換器調(diào)用):
QualityScorer class (called by transformer):
public class QualityScorer {
private Hashtable<String, String> table;
private final static String csvFile = "C:\Users\john\Test.csv";
public QualityScorer() throws Exception {
table = new Hashtable<String, String>();
initializeTable();
}
private void initializeTable() throws Exception {
BufferedReader br = null;
String line = "";
String cvsSplitBy = ",";
try {
br = new BufferedReader(new FileReader(csvFile));
while ((line = br.readLine()) != null) {
String[] data = line.split(cvsSplitBy);
if(data.length > 6 && data[1].equals("1") && data[4].equals("0") && data[5].equals("1"))
table.putIfAbsent(data[3], data[1]);
}
} catch (FileNotFoundException e) {
throw new Exception("No file found");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public float getScore(JSONObject object) throws Exception {
float score;
if (object == null) {
throw new IllegalArgumentException("object");
}
if (!object.has("source")) {
throw new Exception("Object does not have a source");
}
if (!object.has("employer")) {
throw new Exception("Object does not have an employer");
}
String source = object.getString("Source");
String employer = object.getString("employer");
if (table.containsKey(employer) && !source.equals("packageOne")) {
score = 1;
} else {
score = -1;
}
return score;
}
}
現(xiàn)在,正在加載的消息沒有來源,所以程序應(yīng)該將 MessagingException 拋出給 MessageErrorHandler.
Right now, the message being loaded has no source, so the program should be throwing the MessagingException to the MessageErrorHandler.
變壓器代碼:
public class Transformer {
private QualityScorer qualityScorer;
public Transformer() throws Exception {
qualityScorer = new QualityScorer();
}
public JSONObject transform(JSONObject object) throws Exception {
float score = qualityScorer.getScore(object);
object.put("score", score);
return object;
}
}
總的來說,程序應(yīng)該從隊列接收預(yù)加載的消息,對其進行轉(zhuǎn)換并將其發(fā)送到第二個隊列,如果在預(yù)加載的消息中提供了源,它就會成功.我正在嘗試處理錯誤并將其作為消息頭發(fā)送到錯誤隊列.這個問題困擾了我一段時間,非常感謝您的幫助!
All together, the program should receive a pre-loaded message from a queue, transform it and send it on to a second queue, which it does successfully if the source is provided in the pre-loaded message. I'm trying to handle errors and make it so they are sent to an error queue as a message header. This issue has been frustrating me for awhile, so help is greatly appreciated!
stacktrace 中當(dāng)前顯示的錯誤是:
The error currently being shown in the stacktrace is:
java.lang.NoSuchMethodError: org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:56)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:246)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.lang.Thread.run(Thread.java:748)
但是沒有任何東西進入錯誤隊列.
But nothing is going to the error queue.
推薦答案
當(dāng)異常拋出時,與requestMessage
一起包裝到MessagingException
.您自己的業(yè)務(wù)異常在 cause
中,您可以從 MessagingException.failedMessage
屬性訪問 requestMessage
.
When the exception is thrown, it is wrapped together with the requestMessage
to the MessagingException
. Your own business exception is in the cause
and you can get access to the requestMessage
from the MessagingException.failedMessage
property.
因此,您似乎擁有了用例所需的一切.只有在發(fā)送到 error-exchange
之前你真的應(yīng)該在錯誤流中有一些 <transformer>
才能正確轉(zhuǎn)換 MessagingException
的問題到正確的消息發(fā)送到 AMQP.
So, it looks like you have everything you need for your use-case.
Only the problem that before sending to the error-exchange
you really should have some <transformer>
in the error flow to properly convert that MessagingException
to the proper message to send to the AMQP.
這篇關(guān)于在 Spring Integration 中處理異常時遇到問題的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網(wǎng)!