daren/apps/brands/services/status_polling_service.py

129 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import time
import logging
from django.db import close_old_connections
from .offer_status_service import OfferStatusService
logger = logging.getLogger('brands')
class StatusPollingService:
"""提供定时轮询服务,定期获取并更新达人状态"""
def __init__(self):
self._polling_threads = {} # 保存活动ID到线程的映射
self._stop_events = {} # 保存活动ID到停止事件的映射
def start_polling(self, campaign_id, creator_product_pairs, interval=30):
"""
开始轮询指定活动的达人状态
:param campaign_id: 活动ID
:param creator_product_pairs: 达人ID和产品ID的对应关系列表格式为 [(creator_id, product_id), ...]
:param interval: 轮询间隔时间(秒)
"""
# 如果该活动已有轮询线程,先停止它
if campaign_id in self._polling_threads:
self.stop_polling(campaign_id)
# 创建停止事件
stop_event = threading.Event()
self._stop_events[campaign_id] = stop_event
# 创建并启动轮询线程
thread = threading.Thread(
target=self._polling_worker,
args=(campaign_id, creator_product_pairs, interval, stop_event),
daemon=True
)
self._polling_threads[campaign_id] = thread
thread.start()
logger.info(f"已启动活动 {campaign_id} 的状态轮询,间隔 {interval}")
def stop_polling(self, campaign_id):
"""
停止指定活动的轮询
:param campaign_id: 活动ID
"""
if campaign_id in self._stop_events:
# 设置停止事件
self._stop_events[campaign_id].set()
# 等待线程结束
if campaign_id in self._polling_threads:
thread = self._polling_threads[campaign_id]
logger.info(f"正在停止活动 {campaign_id} 的状态轮询...")
# 设置超时,避免无限等待
thread.join(timeout=5)
if thread.is_alive():
logger.warning(f"活动 {campaign_id} 的轮询线程在5秒内未能停止")
else:
logger.info(f"活动 {campaign_id} 的轮询线程已成功停止")
# 清理资源
del self._polling_threads[campaign_id]
del self._stop_events[campaign_id]
logger.info(f"已停止活动 {campaign_id} 的状态轮询")
return True
else:
logger.warning(f"未找到活动 {campaign_id} 的轮询任务")
return False
def stop_all(self):
"""停止所有轮询"""
campaign_ids = list(self._polling_threads.keys())
logger.info(f"正在停止所有轮询任务,活动数量: {len(campaign_ids)}")
success_count = 0
for campaign_id in campaign_ids:
if self.stop_polling(campaign_id):
success_count += 1
logger.info(f"已停止 {success_count}/{len(campaign_ids)} 个轮询任务")
return success_count
def get_active_pollings(self):
"""获取当前正在运行的所有轮询任务信息"""
polling_info = []
for campaign_id, thread in self._polling_threads.items():
polling_info.append({
'campaign_id': campaign_id,
'thread_name': thread.name,
'is_alive': thread.is_alive(),
'daemon': thread.daemon
})
return polling_info
def _polling_worker(self, campaign_id, creator_product_pairs, interval, stop_event):
"""
轮询工作线程
:param campaign_id: 活动ID
:param creator_product_pairs: 达人ID和产品ID的对应关系列表
:param interval: 轮询间隔(秒)
:param stop_event: 停止事件
"""
while not stop_event.is_set():
try:
# 关闭旧的数据库连接
close_old_connections()
# 遍历每个达人-产品对,获取并发送状态更新
for creator_id, product_id in creator_product_pairs:
try:
# 获取状态
status = OfferStatusService.fetch_status(creator_id, product_id)
if status:
# 发送状态更新传递产品ID
OfferStatusService.send_status_update(campaign_id, creator_id, status, product_id)
except Exception as e:
logger.error(f"处理达人 {creator_id} 状态时出错: {str(e)}")
# 等待指定时间间隔
stop_event.wait(interval)
except Exception as e:
logger.error(f"轮询线程发生错误: {str(e)}")
# 短暂休眠后继续
time.sleep(5)
# 创建单例实例
polling_service = StatusPollingService()