Worker 开发
RadStudio 使用 Celery Worker 处理异步任务,不同 Worker 负责不同类型的工作。
Worker 类型
| Worker | 消费队列 | 用途 | 资源需求 |
|---|---|---|---|
gpu | gpu | 深度学习训练/推理 | NVIDIA GPU + CUDA |
cpu | cpu | CPU 回退、小模型 | 多核 CPU |
files | files | 解压、DICOM 转换 | 普通 CPU |
agent | — | 心跳上报、指标采集 | 极低 |
celery | gpu,files | 默认开发 Worker | 通用 |
开发新 Worker
1. 定义任务
在 apps/backend/app/tasks/ 中定义 Celery 任务:
# apps/backend/app/tasks/my_task.py
from app.core.celery_app import celery_app
from celery import current_task
@celery_app.task(bind=True, queue='gpu')
def my_gpu_task(self, project_id: int, params: dict):
"""GPU 训练任务示例"""
try:
# 更新任务状态
self.update_state(state='PROGRESS', meta={'progress': 0})
# 执行任务逻辑
result = do_training(project_id, params)
return {'status': 'completed', 'result': result}
except Exception as exc:
self.update_state(state='FAILURE', meta={'error': str(exc)})
raise
2. 创建 Worker 镜像
# workers/my-worker/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["celery", "-A", "app.core.celery_app", "worker",
"-Q", "my_queue", "--concurrency=2", "--loglevel=info"]
3. 配置 Compose
在 deploy/docker-compose.yml 中添加:
services:
my-worker:
build:
context: ..
dockerfile: workers/my-worker/Dockerfile
command: celery -A app.core.celery_app worker -Q my_queue --concurrency=2
volumes:
- ./data:/data
depends_on:
- redis
- postgres
任务状态管理
# 更新进度
self.update_state(
state='PROGRESS',
meta={
'current': 50,
'total': 100,
'status': '训练中...'
}
)
# 客户端查询进度
result = AsyncResult(task_id)
progress = result.info # {'current': 50, 'total': 100, 'status': '训练中...'}
错误处理
@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
acks_late=True,
)
def reliable_task(self, data):
try:
process(data)
except TemporaryError as exc:
self.retry(exc=exc)
except PermanentError:
# 记录错误,不重试
logger.error(f"Task failed permanently: {data}")
raise