101 lines
3.8 KiB
Python
101 lines
3.8 KiB
Python
![]() |
import threading
|
|||
|
import time
|
|||
|
import logging
|
|||
|
from django.db import close_old_connections
|
|||
|
from .offer_status_service import OfferStatusService
|
|||
|
|
|||
|
logger = logging.getLogger('brands')
|
|||
|
|
|||
|
class StatusPollingService:
|
|||
|
"""提供定时轮询服务,定期获取并更新达人状态"""
|
|||
|
|
|||
|
def __init__(self):
|
|||
|
self._polling_threads = {} # 保存活动ID到线程的映射
|
|||
|
self._stop_events = {} # 保存活动ID到停止事件的映射
|
|||
|
|
|||
|
def start_polling(self, campaign_id, creator_product_pairs, interval=30):
|
|||
|
"""
|
|||
|
开始轮询指定活动的达人状态
|
|||
|
:param campaign_id: 活动ID
|
|||
|
:param creator_product_pairs: 达人ID和产品ID的对应关系列表,格式为 [(creator_id, product_id), ...]
|
|||
|
:param interval: 轮询间隔时间(秒)
|
|||
|
"""
|
|||
|
# 如果该活动已有轮询线程,先停止它
|
|||
|
if campaign_id in self._polling_threads:
|
|||
|
self.stop_polling(campaign_id)
|
|||
|
|
|||
|
# 创建停止事件
|
|||
|
stop_event = threading.Event()
|
|||
|
self._stop_events[campaign_id] = stop_event
|
|||
|
|
|||
|
# 创建并启动轮询线程
|
|||
|
thread = threading.Thread(
|
|||
|
target=self._polling_worker,
|
|||
|
args=(campaign_id, creator_product_pairs, interval, stop_event),
|
|||
|
daemon=True
|
|||
|
)
|
|||
|
self._polling_threads[campaign_id] = thread
|
|||
|
thread.start()
|
|||
|
|
|||
|
logger.info(f"已启动活动 {campaign_id} 的状态轮询,间隔 {interval} 秒")
|
|||
|
|
|||
|
def stop_polling(self, campaign_id):
|
|||
|
"""
|
|||
|
停止指定活动的轮询
|
|||
|
:param campaign_id: 活动ID
|
|||
|
"""
|
|||
|
if campaign_id in self._stop_events:
|
|||
|
# 设置停止事件
|
|||
|
self._stop_events[campaign_id].set()
|
|||
|
|
|||
|
# 等待线程结束
|
|||
|
if campaign_id in self._polling_threads:
|
|||
|
self._polling_threads[campaign_id].join(timeout=5)
|
|||
|
|
|||
|
# 清理资源
|
|||
|
del self._polling_threads[campaign_id]
|
|||
|
|
|||
|
del self._stop_events[campaign_id]
|
|||
|
logger.info(f"已停止活动 {campaign_id} 的状态轮询")
|
|||
|
|
|||
|
def stop_all(self):
|
|||
|
"""停止所有轮询"""
|
|||
|
campaign_ids = list(self._polling_threads.keys())
|
|||
|
for campaign_id in campaign_ids:
|
|||
|
self.stop_polling(campaign_id)
|
|||
|
|
|||
|
def _polling_worker(self, campaign_id, creator_product_pairs, interval, stop_event):
|
|||
|
"""
|
|||
|
轮询工作线程
|
|||
|
:param campaign_id: 活动ID
|
|||
|
:param creator_product_pairs: 达人ID和产品ID的对应关系列表
|
|||
|
:param interval: 轮询间隔(秒)
|
|||
|
:param stop_event: 停止事件
|
|||
|
"""
|
|||
|
while not stop_event.is_set():
|
|||
|
try:
|
|||
|
# 关闭旧的数据库连接
|
|||
|
close_old_connections()
|
|||
|
|
|||
|
# 遍历每个达人-产品对,获取并发送状态更新
|
|||
|
for creator_id, product_id in creator_product_pairs:
|
|||
|
try:
|
|||
|
# 获取状态
|
|||
|
status = OfferStatusService.fetch_status(creator_id, product_id)
|
|||
|
|
|||
|
if status:
|
|||
|
# 发送状态更新
|
|||
|
OfferStatusService.send_status_update(campaign_id, creator_id, status)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"处理达人 {creator_id} 状态时出错: {str(e)}")
|
|||
|
|
|||
|
# 等待指定时间间隔
|
|||
|
stop_event.wait(interval)
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"轮询线程发生错误: {str(e)}")
|
|||
|
# 短暂休眠后继续
|
|||
|
time.sleep(5)
|
|||
|
|
|||
|
# 创建单例实例
|
|||
|
polling_service = StatusPollingService()
|