跨语言任务队列代码实战:Spring Boot + RabbitMQ + Celery 全链路打通

在工业软件相关场景里,后端经常需要把耗时、异步、可并行的任务(如模型推理、文件转码、批处理)从 HTTP 同步链路中剥离出去。最常见的做法是:Java(Spring Boot)作为生产者 → RabbitMQ 作为消息中间件 → Celery(Python)作为消费者/Worker。本文基于真实项目,梳理一套可直接落地集成方案,覆盖关键配置与坑点,帮助你在实际生产环境稳定运行。


为什么我们不用 Pika,而选 Celery?

  • Pika:RabbitMQ 的 底层 Python 客户端。你能直接收发 AMQP 帧,最大自由度,但所有机制都要自己搭(重连、消费并发、超时、重试、结果存储、定时任务、监控……)。
  • Celery:Python 里的 分布式任务队列框架,在底层消息之上,提供了 任务抽象并发模型重试/超时定时路由结果后端事件与可观测性 等高级能力。

我们选择 Celery 的原因很现实:要快速、稳定、可维护地把“消息消费”升级为“任务执行平台”。如果用 Pika,需后期还要为可观测性、可靠性和扩展性持续买单,造成开发资源的极大浪费。

维度CeleryPika
抽象层级任务(Task)语义消息(Message)语义
并发模型prefork / gevent / eventlet 等自己管理线程/协程/进程
重试与超时内建(装饰器/参数即开)自己写 try/except + 退避重试
定时/周期任务Celery Beat 原生支持自己起调度器/cron
结果后端Redis/DB/AMQP 等即插即用自己落库/回写
监控与事件Flower/Events 开箱即用自建监控与埋点
生态广泛、成熟轻量、自由
学习/维护成本低(围绕任务)高(围绕 AMQP 细节)

Celery 介绍(以及 direct vs topic 一句话对比)

Celery 是 Python 生态里成熟的分布式任务队列框架,通常与 RabbitMQ 作为 Broker 搭配:

  • 核心概念
    • Broker:消息中间件(本文用 RabbitMQ)。
    • Task:被分发执行的函数(命名很重要,如 tasks.handle_task)。
    • Worker:执行任务的进程(celery worker)。
    • Serializer:序列化协议(本文统一用 JSON,便于跨语言对接)。
  • 消息格式(与 Celery 兼容)
    • Header 至少包含:id(任务 ID)、task(任务名)。
    • Body 为 三元组[args(list), kwargs(map), embed(map)],我们只用 kwargs 传业务参数,其余保持默认/空。
  • Exchange 类型(直观理解)
    • direct:精确匹配路由键(简单、默认,与 Celery 默认最贴近)。
    • topic:通配匹配(*/#),更灵活,适合多租户/分层路由。

Java 端代码解析(Spring Boot + AMQP)

1. 连接与基础 Bean(MQConfig.java

关键点:

  • 从环境变量读取 MQ 连接(避免硬编码);
  • Jackson2JsonMessageConverter 统一 JSON 协议(与 Celery 对齐);
  • 声明 topic 交换机持久化队列绑定
  • 开启 Publisher Confirm / Return 方便定位路由问题。
Java
package com.induxense.user.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置(生产可用版本)
 *
 * 关键点:
 * 1. 使用 Jackson2JsonMessageConverter,确保与 Celery 的 JSON 协议对齐;
 * 2. Exchange/Queue/Binding 与 Celery 完全一致(topic: task_exchange / task_queue / task_key);
 * 3. 打开 Publisher Confirm & Return,能够定位消息路由/投递失败;
 * 4. 连接参数来自环境变量(优先使用环境变量,避免硬编码凭据);
 */
@Configuration
public class MQConfig {

    // === 与 Celery 端一致 ===
    public static final String QUEUE_NAME = "task_queue";
    public static final String EXCHANGE_NAME = "task_exchange";
    public static final String ROUTING_KEY = "task_key";

    /**
     * 连接工厂
     * - 从环境变量读取 MQ_HOST / MQ_PORT / MQ_USER / MQ_PASS
     * - 开启 Publisher Confirm & Return,便于排错
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        String host = getenvOrDefault("MQ_HOST", "默认IP");
        int port = Integer.parseInt(getenvOrDefault("MQ_PORT", "5672"));
        String username = getenvOrDefault("MQ_USER", "账户");
        String password = getenvOrDefault("MQ_PASS", "密码");

        CachingConnectionFactory factory = new CachingConnectionFactory(host, port);
        factory.setUsername(username);
        factory.setPassword(password);

        // Publisher 确认:CORRELATED 模式(推荐)
        factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        // 开启 Return 回退(不可路由时触发)
        factory.setPublisherReturns(true);

        return factory;
    }

    /**
     * JSON 消息转换器
     * - 与 Celery 的 task_serializer/accept_content=["json"] 对齐
     */
    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * RabbitTemplate
     * - 绑定 JSON 转换器
     * - 设置 ConfirmCallback / ReturnsCallback 打印详细日志
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         Jackson2JsonMessageConverter messageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(messageConverter);

        // 当 mandatory 为 true 且消息无法路由到队列时,会触发 ReturnsCallback
        template.setMandatory(true);

        // Publisher Confirm 回调(消息是否到达 broker)
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                System.err.println("❌ [PublisherConfirm] 消息未被 broker 接收: " + cause
                        + ", correlationData=" + correlationData);
            }
        });

        // Return 回调(消息到达 broker 但路由失败)
        template.setReturnsCallback(returned -> {
            System.err.println("❌ [PublisherReturn] 消息被退回:" +
                    " replyCode=" + returned.getReplyCode() +
                    " replyText=" + returned.getReplyText() +
                    " exchange=" + returned.getExchange() +
                    " routingKey=" + returned.getRoutingKey() +
                    " body=" + new String(returned.getMessage().getBody()));
        });

        return template;
    }

    /**
     * 声明 Topic Exchange(与 Celery 端一致)
     */
    @Bean
    public TopicExchange taskExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     * 声明持久化队列(与 Celery 端一致)
     */
    @Bean
    public Queue taskQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 绑定:queue ←(routingKey)← exchange
     */
    @Bean
    public Binding binding(Queue taskQueue, TopicExchange taskExchange) {
        return BindingBuilder.bind(taskQueue).to(taskExchange).with(ROUTING_KEY);
    }

    // --- 工具方法 ---
    private static String getenvOrDefault(String key, String defVal) {
        String v = System.getenv(key);
        return (v == null || v.isBlank()) ? defVal : v;
        // 注意:K8s/容器环境请用 Secret/ConfigMap 注入上述环境变量
    }
}

2. 发送任务接口(UserController.java

要点:

  • 统一任务名:tasks.handle_task(与 Python 端 @app.task 保持一致);
  • 构造 Celery 三元组 body;
  • 必须设置 idtask 两个 header;
  • 设置 ContentType=application/json、持久化投递。
Java
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*;

import java.util.*;

/**
 * 任务发送控制器
 *
 * 核心:
 * - 使用 RabbitTemplate.convertAndSend 直接发送 Java 对象,自动由 Jackson2JsonMessageConverter 序列化为 JSON;
 * - 构造 Celery 规范三元组:[args(list), kwargs(map), embed(map)];
 * - 设置 headers:id(任务ID)、task(Celery 任务名);
 * - 设置 DeliveryMode 为 PERSISTENT,保障消息持久化;
 */
@RequestMapping("/mqtest/")
@RestController
public class UserController {

    private final RabbitTemplate rabbitTemplate;

    public UserController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * 发送 Celery 任务(JSON 协议),与 Celery 端 @app.task(name="tasks.handle_task") 对齐
     * 示例:GET /mqtest/createtask?task=demo&param=10
     */
    @GetMapping("/createtask")
    public Map<String, Object> sendTask(
            @RequestParam String task,
            @RequestParam(required = false) String param
    ) {
        final String celeryTaskName = "tasks.handle_task";     // 必须与 Celery 任务名一致
        final String taskId = UUID.randomUUID().toString();

        // kwargs:传给 Celery 的业务参数
        Map<String, Object> kwargs = new LinkedHashMap<>();
        kwargs.put("task", task);
        kwargs.put("param", param);
        kwargs.put("timestamp", System.currentTimeMillis());

        // Celery body 三元组:[args(list), kwargs(map), embed(map)]
        List<Object> bodyList = new ArrayList<>(3);
        bodyList.add(Collections.emptyList()); // args 空列表
        bodyList.add(kwargs);
        Map<String, Object> embed = new LinkedHashMap<>();
        embed.put("callbacks", null);
        embed.put("errbacks", null);
        embed.put("chain", null);
        embed.put("chord", null);
        bodyList.add(embed);

        // 发送到与 Celery 一致的 exchange / routingKey(见 MQConfig 与 celery_app.py)
        rabbitTemplate.convertAndSend(
                "task_exchange",
                "task_key",
                bodyList,
                message -> {
                    // === 关键:JSON 协议(与 Celery 的 task_serializer/accept_content 对齐)===
                    message.getMessageProperties().setContentType("application/json");
                    message.getMessageProperties().setContentEncoding("utf-8");

                    // 消息持久化(broker 重启仍不丢)
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

                    // === Celery 要求的头信息 ===
                    message.getMessageProperties().setHeader("id", taskId);
                    message.getMessageProperties().setHeader("task", celeryTaskName);

                    // 非必须,但利于排查
                    message.getMessageProperties().setHeader("lang", "py");
                    message.getMessageProperties().setHeader("argsrepr", "[]");
                    message.getMessageProperties().setHeader("kwargsrepr", kwargs.toString());
                    message.getMessageProperties().setHeader("origin", "java-producer");
                    return message;
                }
        );

        Map<String, Object> resp = new LinkedHashMap<>();
        resp.put("status", "SENT");
        resp.put("celery_task_id", taskId);
        resp.put("data", kwargs);
        return resp;
    }
}

常见易错点

  • 忘记设置 id / task 头;
  • Content-Type 不是 application/json
  • Celery 与 Java 的 交换机/队列/路由键命名不一致
  • 交换机类型不一致(Java 用 topic,Celery 用 direct);
  • 没开 mandatory 导致消息丢而不告警。

Python 端代码解析(celery_app.py

以下为与 Java 完全对齐(topic) 的 Celery 配置与任务函数示例:

Python
# -*- coding: utf-8 -*-
"""
Celery Worker(生产可用版本,Windows 友好)
---------------------------------------

关键点:
1) 与 Java 完全一致的交换机/队列/路由键(topic: task_exchange / task_queue / task_key);
2) 使用 JSON 协议(task_serializer="json", accept_content=["json"]);
3) Windows 下固定使用 worker_pool="solo"(避免 prefork/spawn 引发的异常);
4) 可靠性加固:task_acks_late、prefetch=1、worker 丢失时重投;
5) 支持把处理结果回调给 Java(可选,通过环境变量 JAVA_CALLBACK_URL 控制);
6) MQ 连接参数来自环境变量(避免硬编码凭据);

备注:
- 启动命令(Windows):
  celery -A celery_app worker -l info
  (无需再指定 -P solo,因为已在 app.conf 中固定)

- 首次从 Java 切 JSON 协议上线时,务必清空队列旧消息(RabbitMQ 控制台 → task_queue → Purge),
  以免 Celery 仍然消费到过去的 pickle 格式消息而报错。
"""

import os
import json
import requests
from celery import Celery
from kombu import Exchange, Queue
import time
# -------- 读取环境变量(避免硬编码) --------
MQ_HOST = os.getenv("MQ_HOST", "MQ服务器IP")
MQ_PORT = int(os.getenv("MQ_PORT", "5672"))
MQ_USER = os.getenv("MQ_USER", "账户")
MQ_PASS = os.getenv("MQ_PASS", "密码")

# 可选:任务完成后回调到 Java 的 HTTP 地址(例如 http://app-svc:8080/user/task/result)
JAVA_CALLBACK_URL = os.getenv("JAVA_CALLBACK_URL", "").strip()

# -------- 创建 Celery 应用 --------
app = Celery(
    "celery_app",
    broker=f"amqp://{MQ_USER}:{MQ_PASS}@{MQ_HOST}:{MQ_PORT}//",
    backend="rpc://",  # 如无需结果存储,可移除
)

# -------- 与 Java 一致的交换机/队列/路由键 --------
TASK_EXCHANGE = Exchange("task_exchange", type="topic", durable=True)
TASK_QUEUE = Queue(
    "task_queue",
    exchange=TASK_EXCHANGE,
    routing_key="task_key",
    durable=True,
)

# -------- Celery 配置(协议 + 可靠性 + Windows 兼容) --------
app.conf.update(
    # 协议:JSON
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",

    # 绑定队列
    task_queues=(TASK_QUEUE,),
    task_default_exchange="task_exchange",
    task_default_exchange_type="topic",
    task_default_routing_key="task_key",

    # 可靠性:处理完成再 ack;一次只取 1 个,防止长任务饿死
    task_acks_late=True,
    worker_prefetch_multiplier=1,
    task_reject_on_worker_lost=True,
    task_acks_on_failure_or_timeout=True, # 失败也 ack,避免卡死
    # Windows 友好:固定 solo 池(避免 prefork/spawn 问题)
    worker_pool="solo",
)

# -------- 业务任务 --------
@app.task(name="tasks.handle_task")
def handle_task(**kwargs):
    """
    接收来自 Java 的 kwargs(JSON)
    Java 侧发送的消息体是 Celery 三元组:[[], {kwargs...}, {callbacks..., chain..., chord...}]
    我们这里直接用 **kwargs 接收第二个元素(即 {kwargs...})
    """
    try:
        print("Celery 收到任务 kwargs:", kwargs)


        # === 在此处处理你的业务逻辑 ===
        # 例如:根据 task/param 决定做什么
        task_name = kwargs.get("task")
        param = kwargs.get("param")
        # ... 执行耗时任务、调用模型、文件处理、数据库等 ...
        result_payload = {
            "status": "OK",
            "task": task_name,
            "param": param,
            "timestamp": kwargs.get("timestamp"),
        }
        print("开始执行业务代码")
        time.sleep(30) #模拟计算时间30秒
        print("结束执行业务代码")
        

        return {"status": "OK", "echo": result_payload}

    except Exception as e:
        # 兜底异常,保障 worker 不崩溃
        print("任务执行异常:", repr(e))
        # 返回错误结构化信息(也可选择 raise 交由重试策略处理)
        return {"status": "ERROR", "error": repr(e), "kwargs": kwargs}


启动命令

Python
# pip install celery  requests  # 依赖按需
celery -A celery_app worker --loglevel=info

端到端联调步骤(最小可用)

  1. 启动RabbitMQ
  2. 起 Celery Worker
    • celery -A celery_app worker --loglevel=info
    • 看到监听了 task_queue
  3. 起 Spring Boot
  4. 发请求
    • GET /mqtest/createtask?task=demo&param=10
    • 期望 Worker 日志出现: Received task: tasks.handle_task[<UUID>],并打印 return 结果。

总结

  • Pika 是优秀的 AMQP 原生客户端我们不用 Pika,不是因为它不行,而是因为它“太底层”——需要你把 Celery 已经做好的那一大堆工程化能力再做一遍
  • Celery 在 Pika(或 kombu)的基础上提供了 可直接用于生产的任务平台能力;
  • 对于“Java 生产者 + Python 消费者”的异步化场景,选择 Celery 能在稳定性、可观测性与维护成本上更快达到可上线水平;
使用 Docker 快速搭建 Redis 数据库 在 Linux 中挂载新虚拟化硬盘的完整指南 使用TC(Traffic Control)对内外网分流限速
View Comments
There are currently no comments.