import threading import time import logging from django.db import close_old_connections from .offer_status_service import OfferStatusService logger = logging.getLogger('brands') class StatusPollingService: """提供定时轮询服务,定期获取并更新达人状态""" def __init__(self): self._polling_threads = {} # 保存活动ID到线程的映射 self._stop_events = {} # 保存活动ID到停止事件的映射 def start_polling(self, campaign_id, creator_product_pairs, interval=30): """ 开始轮询指定活动的达人状态 :param campaign_id: 活动ID :param creator_product_pairs: 达人ID和产品ID的对应关系列表,格式为 [(creator_id, product_id), ...] :param interval: 轮询间隔时间(秒) """ # 如果该活动已有轮询线程,先停止它 if campaign_id in self._polling_threads: self.stop_polling(campaign_id) # 创建停止事件 stop_event = threading.Event() self._stop_events[campaign_id] = stop_event # 创建并启动轮询线程 thread = threading.Thread( target=self._polling_worker, args=(campaign_id, creator_product_pairs, interval, stop_event), daemon=True ) self._polling_threads[campaign_id] = thread thread.start() logger.info(f"已启动活动 {campaign_id} 的状态轮询,间隔 {interval} 秒") def stop_polling(self, campaign_id): """ 停止指定活动的轮询 :param campaign_id: 活动ID """ if campaign_id in self._stop_events: # 设置停止事件 self._stop_events[campaign_id].set() # 等待线程结束 if campaign_id in self._polling_threads: thread = self._polling_threads[campaign_id] logger.info(f"正在停止活动 {campaign_id} 的状态轮询...") # 设置超时,避免无限等待 thread.join(timeout=5) if thread.is_alive(): logger.warning(f"活动 {campaign_id} 的轮询线程在5秒内未能停止") else: logger.info(f"活动 {campaign_id} 的轮询线程已成功停止") # 清理资源 del self._polling_threads[campaign_id] del self._stop_events[campaign_id] logger.info(f"已停止活动 {campaign_id} 的状态轮询") return True else: logger.warning(f"未找到活动 {campaign_id} 的轮询任务") return False def stop_all(self): """停止所有轮询""" campaign_ids = list(self._polling_threads.keys()) logger.info(f"正在停止所有轮询任务,活动数量: {len(campaign_ids)}") success_count = 0 for campaign_id in campaign_ids: if self.stop_polling(campaign_id): success_count += 1 logger.info(f"已停止 {success_count}/{len(campaign_ids)} 个轮询任务") return success_count def get_active_pollings(self): """获取当前正在运行的所有轮询任务信息""" polling_info = [] for campaign_id, thread in self._polling_threads.items(): polling_info.append({ 'campaign_id': campaign_id, 'thread_name': thread.name, 'is_alive': thread.is_alive(), 'daemon': thread.daemon }) return polling_info def _polling_worker(self, campaign_id, creator_product_pairs, interval, stop_event): """ 轮询工作线程 :param campaign_id: 活动ID :param creator_product_pairs: 达人ID和产品ID的对应关系列表 :param interval: 轮询间隔(秒) :param stop_event: 停止事件 """ while not stop_event.is_set(): try: # 关闭旧的数据库连接 close_old_connections() # 遍历每个达人-产品对,获取并发送状态更新 for creator_id, product_id in creator_product_pairs: try: # 获取状态 status = OfferStatusService.fetch_status(creator_id, product_id) if status: # 发送状态更新,传递产品ID OfferStatusService.send_status_update(campaign_id, creator_id, status, product_id) except Exception as e: logger.error(f"处理达人 {creator_id} 状态时出错: {str(e)}") # 等待指定时间间隔 stop_event.wait(interval) except Exception as e: logger.error(f"轮询线程发生错误: {str(e)}") # 短暂休眠后继续 time.sleep(5) # 创建单例实例 polling_service = StatusPollingService()