問題描述
我們公司有一個(gè)基于 Python 的網(wǎng)站和一些基于 Python 的工作節(jié)點(diǎn),它們通過 Django/Celery 和 RabbitMQ 進(jìn)行通信.我有一個(gè)基于 Java 的應(yīng)用程序,它需要向基于 Celery 的工作人員提交任務(wù).我可以很好地從 Java 向 RabbitMQ 發(fā)送工作,但是基于 Celery 的工作人員永遠(yuǎn)不會(huì)接受工作.通過查看兩種類型的作業(yè)提交的數(shù)據(jù)包捕獲,存在差異,但我無法理解如何解釋它們,因?yàn)槠渲泻芏嗍嵌M(jìn)制文件,我找不到有關(guān)解碼的文檔.這里有沒有人有 Java/RabbitMQ 和 Celery 一起工作的參考或經(jīng)驗(yàn)?
Our company has a Python based web site and some Python based worker nodes which communicate via Django/Celery and RabbitMQ. I have a Java based application which needs to submit tasks to the Celery based workers. I can send jobs to RabbitMQ from Java just fine, but the Celery based workers are never picking up the jobs. From looking at the packet captures of both types of job submissions, there are differences, but I cannot fathom how to account for them because a lot of it is binary that I cannot find documentation about decoding. Does anyone here have any reference or experience with having Java/RabbitMQ and Celery working together?
推薦答案
我找到了解決方案.RabbitMQ 的 Java 庫是指交換/隊(duì)列/路由鍵.在 Celery 中,隊(duì)列名稱實(shí)際上是映射到 Java 庫中引用的交換.默認(rèn)情況下,Celery 的隊(duì)列只是celery".如果您的 Django 設(shè)置使用以下語法定義了一個(gè)名為myqueue"的隊(duì)列:
I found the solution. The Java library for RabbitMQ refers to exchanges/queues/routekeys. In Celery, the queue name is actually mapping to the exchange referred to in the Java library. By default, the queue for Celery is simply "celery". If your Django settings define a queue called "myqueue" using the following syntax:
CELERY_ROUTES = {
'mypackage.myclass.runworker' : {'queue':'myqueue'},
}
那么基于 Java 的代碼需要做如下的事情:
Then the Java based code needs to do something like the following:
ConnectionFactory factory = new ConnectionFactory();
Connection connection = null ;
try {
connection = factory.newConnection(mqHost, mqPort);
} catch (IOException ioe) {
log.error("Unable to create new MQ connection from factory.", ioe) ;
}
Channel channel = null ;
try {
channel = connection.createChannel();
} catch (IOException ioe) {
log.error("Unable to create new channel for MQ connection.", ioe) ;
}
try {
channel.queueDeclare("celery", false, false, false, true, null);
} catch (IOException ioe) {
log.error("Unable to declare queue for MQ channel.", ioe) ;
}
try {
channel.exchangeDeclare("myqueue", "direct") ;
} catch (IOException ioe) {
log.error("Unable to declare exchange for MQ channel.", ioe) ;
}
try {
channel.queueBind("celery", "myqueue", "myqueue") ;
} catch (IOException ioe) {
log.error("Unable to bind queue for channel.", ioe) ;
}
// Generate the message body as a string here.
try {
channel.basicPublish(mqExchange, mqRouteKey,
new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),
messageBody.getBytes("ASCII"));
} catch (IOException ioe) {
log.error("IOException encountered while trying to publish task via MQ.", ioe) ;
}
事實(shí)證明,這只是術(shù)語上的差異.
It turns out that it is just a difference in terminology.
這篇關(guān)于從 Java 與 Django/Celery 互操作的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!