在工业软件相关场景里,后端经常需要把耗时、异步、可并行的任务(如模型推理、文件转码、批处理)从 HTTP 同步链路中剥离出去。最常见的做法是:Java(Spring Boot)作为生产者 → RabbitMQ 作为消息中间件 → Celery(Python)作为消费者/Worker。本文基于真实项目,梳理一套可直接落地集成方案,覆盖关键配置与坑点,帮助你在实际生产环境稳定运行。
为什么我们不用 Pika,而选 Celery?
- Pika:RabbitMQ 的 底层 Python 客户端。你能直接收发 AMQP 帧,最大自由度,但所有机制都要自己搭(重连、消费并发、超时、重试、结果存储、定时任务、监控……)。
- Celery:Python 里的 分布式任务队列框架,在底层消息之上,提供了 任务抽象、并发模型、重试/超时、定时、路由、结果后端、事件与可观测性 等高级能力。
我们选择 Celery 的原因很现实:要快速、稳定、可维护地把“消息消费”升级为“任务执行平台”。如果用 Pika,需后期还要为可观测性、可靠性和扩展性持续买单,造成开发资源的极大浪费。
| 维度 | Celery | Pika |
|---|---|---|
| 抽象层级 | 任务(Task)语义 | 消息(Message)语义 |
| 并发模型 | prefork / gevent / eventlet 等 | 自己管理线程/协程/进程 |
| 重试与超时 | 内建(装饰器/参数即开) | 自己写 try/except + 退避重试 |
| 定时/周期任务 | Celery Beat 原生支持 | 自己起调度器/cron |
| 结果后端 | Redis/DB/AMQP 等即插即用 | 自己落库/回写 |
| 监控与事件 | Flower/Events 开箱即用 | 自建监控与埋点 |
| 生态 | 广泛、成熟 | 轻量、自由 |
| 学习/维护成本 | 低(围绕任务) | 高(围绕 AMQP 细节) |
Celery 介绍(以及 direct vs topic 一句话对比)
Celery 是 Python 生态里成熟的分布式任务队列框架,通常与 RabbitMQ 作为 Broker 搭配:
- 核心概念
- Broker:消息中间件(本文用 RabbitMQ)。
- Task:被分发执行的函数(命名很重要,如
tasks.handle_task)。 - Worker:执行任务的进程(
celery worker)。 - Serializer:序列化协议(本文统一用 JSON,便于跨语言对接)。
- 消息格式(与 Celery 兼容)
- Header 至少包含:
id(任务 ID)、task(任务名)。 - Body 为 三元组:
[args(list), kwargs(map), embed(map)],我们只用kwargs传业务参数,其余保持默认/空。
- Header 至少包含:
- Exchange 类型(直观理解)
- direct:精确匹配路由键(简单、默认,与 Celery 默认最贴近)。
- topic:通配匹配(
*/#),更灵活,适合多租户/分层路由。
Java 端代码解析(Spring Boot + AMQP)
1. 连接与基础 Bean(MQConfig.java)
关键点:
- 从环境变量读取 MQ 连接(避免硬编码);
- Jackson2JsonMessageConverter 统一 JSON 协议(与 Celery 对齐);
- 声明 topic 交换机、持久化队列、绑定;
- 开启 Publisher Confirm / Return 方便定位路由问题。
Java
package com.induxense.user.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置(生产可用版本)
*
* 关键点:
* 1. 使用 Jackson2JsonMessageConverter,确保与 Celery 的 JSON 协议对齐;
* 2. Exchange/Queue/Binding 与 Celery 完全一致(topic: task_exchange / task_queue / task_key);
* 3. 打开 Publisher Confirm & Return,能够定位消息路由/投递失败;
* 4. 连接参数来自环境变量(优先使用环境变量,避免硬编码凭据);
*/
@Configuration
public class MQConfig {
// === 与 Celery 端一致 ===
public static final String QUEUE_NAME = "task_queue";
public static final String EXCHANGE_NAME = "task_exchange";
public static final String ROUTING_KEY = "task_key";
/**
* 连接工厂
* - 从环境变量读取 MQ_HOST / MQ_PORT / MQ_USER / MQ_PASS
* - 开启 Publisher Confirm & Return,便于排错
*/
@Bean
public ConnectionFactory connectionFactory() {
String host = getenvOrDefault("MQ_HOST", "默认IP");
int port = Integer.parseInt(getenvOrDefault("MQ_PORT", "5672"));
String username = getenvOrDefault("MQ_USER", "账户");
String password = getenvOrDefault("MQ_PASS", "密码");
CachingConnectionFactory factory = new CachingConnectionFactory(host, port);
factory.setUsername(username);
factory.setPassword(password);
// Publisher 确认:CORRELATED 模式(推荐)
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// 开启 Return 回退(不可路由时触发)
factory.setPublisherReturns(true);
return factory;
}
/**
* JSON 消息转换器
* - 与 Celery 的 task_serializer/accept_content=["json"] 对齐
*/
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* RabbitTemplate
* - 绑定 JSON 转换器
* - 设置 ConfirmCallback / ReturnsCallback 打印详细日志
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter);
// 当 mandatory 为 true 且消息无法路由到队列时,会触发 ReturnsCallback
template.setMandatory(true);
// Publisher Confirm 回调(消息是否到达 broker)
template.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
System.err.println("❌ [PublisherConfirm] 消息未被 broker 接收: " + cause
+ ", correlationData=" + correlationData);
}
});
// Return 回调(消息到达 broker 但路由失败)
template.setReturnsCallback(returned -> {
System.err.println("❌ [PublisherReturn] 消息被退回:" +
" replyCode=" + returned.getReplyCode() +
" replyText=" + returned.getReplyText() +
" exchange=" + returned.getExchange() +
" routingKey=" + returned.getRoutingKey() +
" body=" + new String(returned.getMessage().getBody()));
});
return template;
}
/**
* 声明 Topic Exchange(与 Celery 端一致)
*/
@Bean
public TopicExchange taskExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 声明持久化队列(与 Celery 端一致)
*/
@Bean
public Queue taskQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 绑定:queue ←(routingKey)← exchange
*/
@Bean
public Binding binding(Queue taskQueue, TopicExchange taskExchange) {
return BindingBuilder.bind(taskQueue).to(taskExchange).with(ROUTING_KEY);
}
// --- 工具方法 ---
private static String getenvOrDefault(String key, String defVal) {
String v = System.getenv(key);
return (v == null || v.isBlank()) ? defVal : v;
// 注意:K8s/容器环境请用 Secret/ConfigMap 注入上述环境变量
}
}
2. 发送任务接口(UserController.java)
要点:
- 统一任务名:
tasks.handle_task(与 Python 端 @app.task 保持一致); - 构造 Celery 三元组 body;
- 必须设置
id与task两个 header; - 设置
ContentType=application/json、持久化投递。
Java
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*;
import java.util.*;
/**
* 任务发送控制器
*
* 核心:
* - 使用 RabbitTemplate.convertAndSend 直接发送 Java 对象,自动由 Jackson2JsonMessageConverter 序列化为 JSON;
* - 构造 Celery 规范三元组:[args(list), kwargs(map), embed(map)];
* - 设置 headers:id(任务ID)、task(Celery 任务名);
* - 设置 DeliveryMode 为 PERSISTENT,保障消息持久化;
*/
@RequestMapping("/mqtest/")
@RestController
public class UserController {
private final RabbitTemplate rabbitTemplate;
public UserController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 发送 Celery 任务(JSON 协议),与 Celery 端 @app.task(name="tasks.handle_task") 对齐
* 示例:GET /mqtest/createtask?task=demo¶m=10
*/
@GetMapping("/createtask")
public Map<String, Object> sendTask(
@RequestParam String task,
@RequestParam(required = false) String param
) {
final String celeryTaskName = "tasks.handle_task"; // 必须与 Celery 任务名一致
final String taskId = UUID.randomUUID().toString();
// kwargs:传给 Celery 的业务参数
Map<String, Object> kwargs = new LinkedHashMap<>();
kwargs.put("task", task);
kwargs.put("param", param);
kwargs.put("timestamp", System.currentTimeMillis());
// Celery body 三元组:[args(list), kwargs(map), embed(map)]
List<Object> bodyList = new ArrayList<>(3);
bodyList.add(Collections.emptyList()); // args 空列表
bodyList.add(kwargs);
Map<String, Object> embed = new LinkedHashMap<>();
embed.put("callbacks", null);
embed.put("errbacks", null);
embed.put("chain", null);
embed.put("chord", null);
bodyList.add(embed);
// 发送到与 Celery 一致的 exchange / routingKey(见 MQConfig 与 celery_app.py)
rabbitTemplate.convertAndSend(
"task_exchange",
"task_key",
bodyList,
message -> {
// === 关键:JSON 协议(与 Celery 的 task_serializer/accept_content 对齐)===
message.getMessageProperties().setContentType("application/json");
message.getMessageProperties().setContentEncoding("utf-8");
// 消息持久化(broker 重启仍不丢)
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// === Celery 要求的头信息 ===
message.getMessageProperties().setHeader("id", taskId);
message.getMessageProperties().setHeader("task", celeryTaskName);
// 非必须,但利于排查
message.getMessageProperties().setHeader("lang", "py");
message.getMessageProperties().setHeader("argsrepr", "[]");
message.getMessageProperties().setHeader("kwargsrepr", kwargs.toString());
message.getMessageProperties().setHeader("origin", "java-producer");
return message;
}
);
Map<String, Object> resp = new LinkedHashMap<>();
resp.put("status", "SENT");
resp.put("celery_task_id", taskId);
resp.put("data", kwargs);
return resp;
}
}
常见易错点
- 忘记设置
id/task头; Content-Type不是application/json;- Celery 与 Java 的 交换机/队列/路由键命名不一致;
- 交换机类型不一致(Java 用 topic,Celery 用 direct);
- 没开
mandatory导致消息丢而不告警。
Python 端代码解析(celery_app.py)
以下为与 Java 完全对齐(topic) 的 Celery 配置与任务函数示例:
Python
# -*- coding: utf-8 -*-
"""
Celery Worker(生产可用版本,Windows 友好)
---------------------------------------
关键点:
1) 与 Java 完全一致的交换机/队列/路由键(topic: task_exchange / task_queue / task_key);
2) 使用 JSON 协议(task_serializer="json", accept_content=["json"]);
3) Windows 下固定使用 worker_pool="solo"(避免 prefork/spawn 引发的异常);
4) 可靠性加固:task_acks_late、prefetch=1、worker 丢失时重投;
5) 支持把处理结果回调给 Java(可选,通过环境变量 JAVA_CALLBACK_URL 控制);
6) MQ 连接参数来自环境变量(避免硬编码凭据);
备注:
- 启动命令(Windows):
celery -A celery_app worker -l info
(无需再指定 -P solo,因为已在 app.conf 中固定)
- 首次从 Java 切 JSON 协议上线时,务必清空队列旧消息(RabbitMQ 控制台 → task_queue → Purge),
以免 Celery 仍然消费到过去的 pickle 格式消息而报错。
"""
import os
import json
import requests
from celery import Celery
from kombu import Exchange, Queue
import time
# -------- 读取环境变量(避免硬编码) --------
MQ_HOST = os.getenv("MQ_HOST", "MQ服务器IP")
MQ_PORT = int(os.getenv("MQ_PORT", "5672"))
MQ_USER = os.getenv("MQ_USER", "账户")
MQ_PASS = os.getenv("MQ_PASS", "密码")
# 可选:任务完成后回调到 Java 的 HTTP 地址(例如 http://app-svc:8080/user/task/result)
JAVA_CALLBACK_URL = os.getenv("JAVA_CALLBACK_URL", "").strip()
# -------- 创建 Celery 应用 --------
app = Celery(
"celery_app",
broker=f"amqp://{MQ_USER}:{MQ_PASS}@{MQ_HOST}:{MQ_PORT}//",
backend="rpc://", # 如无需结果存储,可移除
)
# -------- 与 Java 一致的交换机/队列/路由键 --------
TASK_EXCHANGE = Exchange("task_exchange", type="topic", durable=True)
TASK_QUEUE = Queue(
"task_queue",
exchange=TASK_EXCHANGE,
routing_key="task_key",
durable=True,
)
# -------- Celery 配置(协议 + 可靠性 + Windows 兼容) --------
app.conf.update(
# 协议:JSON
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# 绑定队列
task_queues=(TASK_QUEUE,),
task_default_exchange="task_exchange",
task_default_exchange_type="topic",
task_default_routing_key="task_key",
# 可靠性:处理完成再 ack;一次只取 1 个,防止长任务饿死
task_acks_late=True,
worker_prefetch_multiplier=1,
task_reject_on_worker_lost=True,
task_acks_on_failure_or_timeout=True, # 失败也 ack,避免卡死
# Windows 友好:固定 solo 池(避免 prefork/spawn 问题)
worker_pool="solo",
)
# -------- 业务任务 --------
@app.task(name="tasks.handle_task")
def handle_task(**kwargs):
"""
接收来自 Java 的 kwargs(JSON)
Java 侧发送的消息体是 Celery 三元组:[[], {kwargs...}, {callbacks..., chain..., chord...}]
我们这里直接用 **kwargs 接收第二个元素(即 {kwargs...})
"""
try:
print("Celery 收到任务 kwargs:", kwargs)
# === 在此处处理你的业务逻辑 ===
# 例如:根据 task/param 决定做什么
task_name = kwargs.get("task")
param = kwargs.get("param")
# ... 执行耗时任务、调用模型、文件处理、数据库等 ...
result_payload = {
"status": "OK",
"task": task_name,
"param": param,
"timestamp": kwargs.get("timestamp"),
}
print("开始执行业务代码")
time.sleep(30) #模拟计算时间30秒
print("结束执行业务代码")
return {"status": "OK", "echo": result_payload}
except Exception as e:
# 兜底异常,保障 worker 不崩溃
print("任务执行异常:", repr(e))
# 返回错误结构化信息(也可选择 raise 交由重试策略处理)
return {"status": "ERROR", "error": repr(e), "kwargs": kwargs}
启动命令
Python
# pip install celery requests # 依赖按需
celery -A celery_app worker --loglevel=info端到端联调步骤(最小可用)
- 启动RabbitMQ
- 起 Celery Worker
celery -A celery_app worker --loglevel=info- 看到监听了
task_queue。
- 起 Spring Boot
- 发请求
GET /mqtest/createtask?task=demo¶m=10- 期望 Worker 日志出现:
Received task: tasks.handle_task[<UUID>],并打印return结果。
总结
- Pika 是优秀的 AMQP 原生客户端,我们不用 Pika,不是因为它不行,而是因为它“太底层”——需要你把 Celery 已经做好的那一大堆工程化能力再做一遍。
- Celery 在 Pika(或 kombu)的基础上提供了 可直接用于生产的任务平台能力;
- 对于“Java 生产者 + Python 消费者”的异步化场景,选择 Celery 能在稳定性、可观测性与维护成本上更快达到可上线水平;
View Comments