跳到主要内容

Worker 开发

RadStudio 使用 Celery Worker 处理异步任务,不同 Worker 负责不同类型的工作。

Worker 类型

Worker消费队列用途资源需求
gpugpu深度学习训练/推理NVIDIA GPU + CUDA
cpucpuCPU 回退、小模型多核 CPU
filesfiles解压、DICOM 转换普通 CPU
agent心跳上报、指标采集极低
celerygpu,files默认开发 Worker通用

开发新 Worker

1. 定义任务

apps/backend/app/tasks/ 中定义 Celery 任务:

# apps/backend/app/tasks/my_task.py
from app.core.celery_app import celery_app
from celery import current_task

@celery_app.task(bind=True, queue='gpu')
def my_gpu_task(self, project_id: int, params: dict):
"""GPU 训练任务示例"""
try:
# 更新任务状态
self.update_state(state='PROGRESS', meta={'progress': 0})

# 执行任务逻辑
result = do_training(project_id, params)

return {'status': 'completed', 'result': result}
except Exception as exc:
self.update_state(state='FAILURE', meta={'error': str(exc)})
raise

2. 创建 Worker 镜像

# workers/my-worker/Dockerfile
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["celery", "-A", "app.core.celery_app", "worker",
"-Q", "my_queue", "--concurrency=2", "--loglevel=info"]

3. 配置 Compose

deploy/docker-compose.yml 中添加:

services:
my-worker:
build:
context: ..
dockerfile: workers/my-worker/Dockerfile
command: celery -A app.core.celery_app worker -Q my_queue --concurrency=2
volumes:
- ./data:/data
depends_on:
- redis
- postgres

任务状态管理

# 更新进度
self.update_state(
state='PROGRESS',
meta={
'current': 50,
'total': 100,
'status': '训练中...'
}
)

# 客户端查询进度
result = AsyncResult(task_id)
progress = result.info # {'current': 50, 'total': 100, 'status': '训练中...'}

错误处理

@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
acks_late=True,
)
def reliable_task(self, data):
try:
process(data)
except TemporaryError as exc:
self.retry(exc=exc)
except PermanentError:
# 记录错误,不重试
logger.error(f"Task failed permanently: {data}")
raise