import os from celery import Celery from celery.schedules import crontab # 设置默认Django设置模块 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'role_based_system.settings') app = Celery('role_based_system') # 使用字符串表示,避免pickle序列化可能带来的安全问题 app.config_from_object('django.conf:settings', namespace='CELERY') # 自动从所有注册的Django应用中加载任务 app.autodiscover_tasks() # 配置定期任务 app.conf.beat_schedule = { # 每小时检查一次未发布的视频 'check-scheduled-videos-every-hour': { 'task': 'user_management.tasks.check_scheduled_videos', 'schedule': crontab(minute=0, hour='*/1'), # 每小时运行一次 }, } @app.task(bind=True, ignore_result=True) def debug_task(self): print(f'Request: {self.request!r}')