automated_task_monitor/monitor/views.py

2444 lines
98 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.http import JsonResponse
from .tasks import monitor_process, get_process_gpu_usage
import threading
import psutil
from .models import HighCPUProcess, HighGPUProcess, HighMemoryProcess, AllResourceProcess, TiktokUserVideos
import logging
import os
from datetime import datetime
import time
import nvidia_smi
from django.utils import timezone
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render
import pytz
from pathlib import Path # 使用 pathlib 处理跨平台路径
import json
from django.conf import settings
import requests
import threading
from collections import deque
from celery import shared_task
import uuid
from functools import wraps
from datetime import timedelta
import redis
from concurrent.futures import ThreadPoolExecutor, as_completed
from yt_dlp import YoutubeDL
directory_monitoring = {}
# 全局变量来控制检测线程
monitor_thread = None
is_monitoring = False
# 在文件开头定义日志目录
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(BASE_DIR, 'logs', 'process_monitor')
# 创建保存视频的基本路径
TIKTOK_VIDEOS_PATH = os.path.join(BASE_DIR, 'media', 'tiktok_videos')
# 确保基本目录存在
os.makedirs(TIKTOK_VIDEOS_PATH, exist_ok=True)
# 确保基础目录结构存在,添加 'all' 目录
for resource_type in ['cpu', 'memory', 'gpu', 'all']:
os.makedirs(os.path.join(LOG_DIR, resource_type), exist_ok=True)
# 获取应用专属的logger
logger = logging.getLogger('monitor')
# 全局变量来跟踪监控的目录
monitored_directories = set()
# 在文件顶部添加 API 基础 URL
API_BASE_URL = "http://81.69.223.133:45268"
# 创建Redis连接
redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL)
# 替代原来的TASK_STATUS字典方法
def set_task_status(task_id, status_data):
"""在Redis中存储任务状态"""
redis_client.set(f"task_status:{task_id}", json.dumps(status_data))
def get_task_status(task_id):
"""从Redis获取任务状态"""
data = redis_client.get(f"task_status:{task_id}")
if data:
return json.loads(data)
return None
def update_task_status(task_id, updates):
"""更新任务状态的部分字段"""
status = get_task_status(task_id)
if status:
status.update(updates)
set_task_status(task_id, status)
return True
return False
# 添加新的监控线程函数
def monitor_directory(directory):
"""持续监控目录的后台任务"""
monitor_interval = settings.MONITOR_INTERVAL # 监控间隔5秒
log_interval = settings.LOG_INTERVAL # 日志写入间隔60秒
processed_pids = set() # 用于跟踪已处理的PID
last_log_time = {} # 每个进程的最后日志时间
while directory_monitoring.get(directory, False):
try:
next_update = timezone.now() + timezone.timedelta(seconds=monitor_interval)
processes = find_python_processes_in_dir(directory)
for proc in processes:
pid = proc['pid']
try:
# 设置日志文件
log_file = setup_process_logger(pid, 'all')
if log_file:
process = psutil.Process(pid)
data = get_process_data(process)
# 更新数据库
AllResourceProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': float(data['cpu']['usage'].replace('%', '')),
'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')),
'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')),
'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()),
'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()),
'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()),
'gpu_usage': float(data['gpu']['usage'].replace('%', '')),
'gpu_memory': float(data['gpu']['memory'].replace('MB', '')),
'net_io_sent': float(data['io']['write'].split('MB')[0].strip()),
'net_io_recv': float(data['io']['read'].split('MB')[0].strip()),
'is_active': True,
'status': 1,
'log_file': log_file
}
)
now = timezone.now()
# 如果是新检测到的进程,立即写入日志
if pid not in processed_pids:
print(f"首次检测到进程,立即写入日志: PID={pid}") # 调试日志
log_process_metrics(pid, data, log_file)
processed_pids.add(pid)
last_log_time[pid] = now
# 对于已知进程,每隔一分钟写入一次
elif (now - last_log_time.get(pid, now)).total_seconds() >= log_interval:
print(f"定期写入日志: PID={pid}") # 调试日志
log_process_metrics(pid, data, log_file)
last_log_time[pid] = now
except Exception as e:
logger.error(f"监控进程 {pid} 失败: {str(e)}")
# 计算需要等待的时间
now = timezone.now()
if next_update > now:
time.sleep((next_update - now).total_seconds())
except Exception as e:
logger.error(f"目录监控出错: {str(e)}")
time.sleep(5)
def get_python_processes(directory):
"""获取指定目录下的Python进程"""
directory = str(Path(directory).resolve()).lower()
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if 'python' in proc.info['name'].lower():
process = psutil.Process(proc.info['pid'])
try:
proc_cwd = str(Path(process.cwd()).resolve()).lower()
if directory in proc_cwd or proc_cwd.startswith(directory):
processes.append(process)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return processes
def get_process_info(process):
"""获取进程详细信息"""
try:
with process.oneshot():
info = {
'pid': process.pid,
'name': process.name(),
'cpu_usage': process.cpu_percent(),
'memory_usage': process.memory_info().rss / (1024 * 1024), # MB
'command_line': ' '.join(process.cmdline()),
'create_time': process.create_time(),
'status': process.status(),
'threads': process.num_threads(),
'working_directory': process.cwd(),
'gpu_info': {
'usage': 0,
'memory': 0
}
}
# 获取IO信息
try:
io = process.io_counters()
info['io'] = {
'read_bytes': io.read_bytes / (1024 * 1024), # MB
'write_bytes': io.write_bytes / (1024 * 1024), # MB
'read_count': io.read_count,
'write_count': io.write_count
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
info['io'] = {
'read_bytes': 0,
'write_bytes': 0,
'read_count': 0,
'write_count': 0
}
return info
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.error(f"获取进程 {process.pid} 信息失败: {str(e)}")
return None
@csrf_exempt
@require_http_methods(["POST"])
def scan_directory(request):
"""扫描目录下的 Python 进程"""
try:
data = json.loads(request.body)
directory = data.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '请提供目录路径'
})
# 添加到监控目录集合
directory = str(Path(directory).resolve())
monitored_directories.add(directory)
logger.info(f"开始监控目录: {directory}")
return JsonResponse({
'status': 'success',
'message': f'开始监控目录: {directory}',
'directory': directory
})
except Exception as e:
logger.error(f"启动目录监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
@require_http_methods(["GET"])
def get_directory_status(request):
"""获取目录下的进程状态"""
try:
directory = request.GET.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '未提供目录路径'
})
# 获取目录下的Python进程
processes = get_python_processes(directory)
# 获取每个进程的详细信息
process_info = []
for proc in processes:
info = get_process_info(proc)
if info:
process_info.append(info)
return JsonResponse({
'status': 'success',
'processes': process_info,
'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
logger.error(f"获取目录状态失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
@require_http_methods(["POST"])
def stop_directory_monitor(request):
"""停止目录监控"""
try:
data = json.loads(request.body)
directory = data.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '未提供目录路径'
})
# 从监控集合中移除
directory = str(Path(directory).resolve())
if directory in monitored_directories:
monitored_directories.remove(directory)
logger.info(f"停止监控目录: {directory}")
return JsonResponse({
'status': 'success',
'message': '监控已停止'
})
except Exception as e:
logger.error(f"停止监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
def setup_process_logger(pid, monitor_type):
"""为进程设置独立的日志记录器"""
try:
# 验证监控类型
if monitor_type not in ['cpu', 'memory', 'gpu', 'all']:
print(f"警告:无效的监控类型 {monitor_type}")
return None
# 获取当前时间(使用本地时区)
local_tz = pytz.timezone('Asia/Shanghai')
current_time = timezone.localtime(timezone.now(), local_tz)
current_date = current_time.strftime("%Y%m%d")
# 如果是从目录扫描启动的监控,使用 'all' 类型
if monitor_type == 'scan':
monitor_type = 'all'
# 构建日志文件路径
log_dir = os.path.join(LOG_DIR, monitor_type)
log_file = os.path.join(log_dir, f'{pid}_{current_date}_{monitor_type}.log')
# 确保目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
print(f"设置日志文件: {log_file}") # 调试信息
return log_file
except Exception as e:
print(f"设置日志文件失败: {str(e)}")
raise
def log_process_metrics(pid, data, log_file):
"""记录进程指标到日志文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 获取当前时间(使用本地时区)
local_tz = pytz.timezone('Asia/Shanghai') # 使用中国时区
current_time = timezone.localtime(timezone.now(), local_tz)
timestamp = current_time.strftime('%Y-%m-%d %H:%M:%S')
# 格式化日志内容
log_content = f"=== {timestamp} ===\n"
# CPU信息
log_content += "CPU信息:\n"
log_content += f"├─ 使用率: {data['cpu']['usage']}\n"
log_content += f"├─ 用户态时间: {data['cpu']['user_time']}\n"
log_content += f"├─ 内核态时间: {data['cpu']['system_time']}\n"
log_content += f"├─ CPU核心数: {data['cpu']['cores']}\n"
log_content += f"├─ CPU频率: {data['cpu']['frequency']}\n"
log_content += f"└─ 上下文切换: {data['cpu']['context_switches']}\n"
# 内存信息
log_content += "内存信息:\n"
log_content += f"├─ 物理内存: {data['memory']['physical']}\n"
log_content += f"├─ 虚拟内存: {data['memory']['virtual']}\n"
log_content += f"├─ 内存映射: {data['memory']['mappings']}\n"
log_content += f"├─ 系统内存使用: {data['memory']['system_usage']}\n"
log_content += f"└─ 交换空间使用: {data['memory']['swap_usage']}\n"
# GPU信息
log_content += "GPU信息:\n"
log_content += f"├─ 使用率: {data['gpu']['usage']}\n"
log_content += f"└─ 显存使用: {data['gpu']['memory']}\n"
# IO信息
log_content += "IO信息:\n"
log_content += f"├─ 读取: {data['io']['read']}\n"
log_content += f"├─ 写入: {data['io']['write']}\n"
log_content += f"└─ 其他IO: {data['io']['other']}\n"
# 进程状态
log_content += f"进程状态: {data['status']}\n"
log_content += "-" * 50 + "\n"
# 写入日志文件
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_content)
print(f"已写入日志到: {log_file}") # 调试信息
except Exception as e:
print(f"写入日志失败: {str(e)}")
raise
def get_process_by_name(process_name):
"""根据进程名称获取进程PID"""
pids = []
for proc in psutil.process_iter(['pid', 'name']):
try:
if process_name.lower() in proc.info['name'].lower():
pids.append(proc.info['pid'])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pids
def get_process_gpu_usage(pid):
"""获取进程的GPU使用情况"""
try:
# 如果没有GPU或无法获取GPU信息返回默认值
return 0.0, 0.0
except Exception as e:
logger.error(f"获取GPU信息失败: {str(e)}")
return 0.0, 0.0
def get_high_resource_processes():
"""获取高资源占用的进程"""
high_resource_pids = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
if proc.info['cpu_percent'] > 50 or proc.info['memory_percent'] > 50:
high_resource_pids.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cpu_percent': proc.info['cpu_percent'],
'memory_percent': proc.info['memory_percent']
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return high_resource_pids
def auto_detect_high_resource_processes(request):
"""自动检测高资源进程的API端点"""
try:
# 获取高资源进程
high_resource_procs = get_high_resource_processes()
# 启动监控
for proc in high_resource_procs:
try:
process = psutil.Process(proc['pid'])
# 设置日志文件
log_file = setup_process_logger(proc['pid'], 'auto')
# 记录进程数据
data = get_process_data(process)
log_process_metrics(proc['pid'], data, log_file)
except psutil.NoSuchProcess:
continue
except Exception as e:
logger.error(f"监控进程 {proc['pid']} 时出错: {str(e)}")
return JsonResponse({
'status': 'success',
'message': '已开始自动检测高资源进程',
'processes': high_resource_procs
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"自动检测失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'自动检测失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
def setup_logger(pid):
"""为每个进程设置独立的日志记录器"""
log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'process_monitor', f'process_{pid}_{datetime.now().strftime("%Y%m%d")}.log')
process_logger = logging.getLogger(f'monitor.process.process_{pid}')
process_logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
process_logger.addHandler(handler)
return process_logger, log_file
def continuous_monitor():
"""持续监控高资源进程的线程函数"""
global is_monitoring
logger = logging.getLogger('monitor')
interval = 60 # 设置为60秒间隔
while is_monitoring:
try:
next_update = timezone.now() + timezone.timedelta(seconds=interval)
# 获取所有活跃的监控记录
monitors = {
'cpu': HighCPUProcess.objects.filter(is_active=True),
'memory': HighMemoryProcess.objects.filter(is_active=True),
'gpu': HighGPUProcess.objects.filter(is_active=True)
}
for monitor_type, processes in monitors.items():
for proc in processes:
try:
process = psutil.Process(proc.pid)
data = get_process_data(process)
log_file = setup_process_logger(proc.pid, monitor_type)
log_process_metrics(proc.pid, data, log_file)
except psutil.NoSuchProcess:
proc.is_active = False
proc.save()
except Exception as e:
logger.error(f"监控进程 {proc.pid} 时出错: {str(e)}")
# 计算需要等待的时间
now = timezone.now()
if next_update > now:
time.sleep((next_update - now).total_seconds())
except Exception as e:
logger.error(f"持续监控出错: {str(e)}")
time.sleep(5)
def get_process_data(process):
"""获取进程的详细数据"""
with process.oneshot():
cpu_times = process.cpu_times()
cpu_freq = psutil.cpu_freq() or psutil._common.scpufreq(current=0, min=0, max=0)
cpu_ctx = process.num_ctx_switches()
mem = process.memory_info()
try:
mem_maps = len(process.memory_maps())
except (psutil.AccessDenied, psutil.TimeoutExpired):
mem_maps = 0
sys_mem = psutil.virtual_memory()
swap = psutil.swap_memory()
try:
io = process.io_counters()
except (psutil.AccessDenied, psutil.TimeoutExpired):
io = psutil._common.pio(read_count=0, write_count=0, read_bytes=0, write_bytes=0,
read_chars=0, write_chars=0, other_count=0, other_bytes=0)
gpu_usage, gpu_memory = get_process_gpu_usage(process.pid)
return {
'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
'cpu': {
'usage': f"{process.cpu_percent()}%",
'user_time': f"{cpu_times.user:.1f}s",
'system_time': f"{cpu_times.system:.1f}s",
'cores': str(psutil.cpu_count()),
'frequency': f"{getattr(cpu_freq, 'current', 0):.1f}MHz",
'context_switches': f"{cpu_ctx.voluntary}/{cpu_ctx.involuntary}"
},
'memory': {
'physical': f"{mem.rss/1024/1024:.1f}MB ({process.memory_percent():.1f}%)",
'virtual': f"{mem.vms/1024/1024:.1f}MB",
'mappings': f"{mem_maps}",
'system_usage': f"{sys_mem.percent:.1f}%",
'swap_usage': f"{swap.percent:.1f}%"
},
'gpu': {
'usage': f"{gpu_usage:.1f}%",
'memory': f"{gpu_memory:.1f}MB"
},
'io': {
'read': f"{getattr(io, 'read_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'read_count', 0)}次)",
'write': f"{getattr(io, 'write_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'write_count', 0)}次)",
'other': f"{getattr(io, 'other_count', 0)}"
},
'status': process.status()
}
@csrf_exempt
@require_http_methods(["GET", "POST"])
def start_monitor(request):
"""开始监控进程"""
global monitor_thread, is_monitoring
try:
pid = request.GET.get('pid')
monitor_type = request.GET.get('type')
if not pid:
return JsonResponse({
'status': 'error',
'message': '请输入进程ID'
}, json_dumps_params={'ensure_ascii': False})
pid = int(pid)
try:
process = psutil.Process(pid)
process_name = process.name()
# 启动监控线程
if not is_monitoring:
is_monitoring = True
monitor_thread = threading.Thread(target=continuous_monitor)
monitor_thread.daemon = True
monitor_thread.start()
return JsonResponse({
'status': 'success',
'message': f'开始监控进程 {pid} ({process_name})'
}, json_dumps_params={'ensure_ascii': False})
except psutil.NoSuchProcess:
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 不存在'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"启动监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'启动监控失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET", "POST"])
def stop_monitor(request):
"""停止监控进程"""
global is_monitoring
try:
pid = request.GET.get('pid')
if not pid:
return JsonResponse({
'status': 'error',
'message': '请输入进程ID'
}, json_dumps_params={'ensure_ascii': False})
pid = int(pid)
# 停止监控线程
is_monitoring = False
return JsonResponse({
'status': 'success',
'message': f'停止监控进程 {pid}'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"停止监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'停止监控失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@require_http_methods(["GET"])
def get_process_status(request, pid):
"""获取进程监控状态的API"""
try:
monitor_type = request.GET.get('type')
logger.info(f"获取进程状态: pid={pid}, type={monitor_type}")
# 先设置日志文件路径
log_file = setup_process_logger(pid, monitor_type)
if not log_file:
raise ValueError(f"无法创建日志文件,监控类型: {monitor_type}")
process = psutil.Process(pid)
data = get_process_data(process)
# 同步数据到数据库
try:
# 更新 all 资源记录
AllResourceProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': float(data['cpu']['usage'].replace('%', '')),
'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')),
'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')),
'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()),
'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()),
'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()),
'gpu_usage': float(data['gpu']['usage'].replace('%', '')),
'gpu_memory': float(data['gpu']['memory'].replace('MB', '')),
'net_io_sent': float(data['io']['write'].split('MB')[0].strip()),
'net_io_recv': float(data['io']['read'].split('MB')[0].strip()),
'is_active': True,
'status': 1,
'log_file': log_file
}
)
# 根据监控类型选择对应的模型
if monitor_type == 'cpu':
# 从字符串中提取CPU使用率数值
cpu_usage = float(data['cpu']['usage'].replace('%', ''))
logger.info(f"CPU使用率: {cpu_usage}%") # 调试日志
process_obj, created = HighCPUProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': cpu_usage,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
logger.info(f"{'创建' if created else '更新'}CPU进程记录: {process_obj}")
elif monitor_type == 'memory':
# 从字符串中提取内存使用量和百分比
memory_info = data['memory']['physical']
memory_usage = float(memory_info.split('MB')[0].strip())
memory_percent = float(memory_info.split('(')[1].split('%')[0].strip())
HighMemoryProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'memory_usage': memory_usage,
'memory_percent': memory_percent,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
elif monitor_type == 'gpu':
# 从字符串中提取GPU使用率和显存
gpu_usage = float(data['gpu']['usage'].replace('%', ''))
gpu_memory = float(data['gpu']['memory'].replace('MB', ''))
HighGPUProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'gpu_usage': gpu_usage,
'gpu_memory': gpu_memory,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
return JsonResponse({
'status': 'success',
'data': data
})
except psutil.NoSuchProcess:
# 进程已终止
monitor.is_active = False
monitor.status = 0
monitor.save()
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 已终止'
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
})
def update_process_status(pid, monitor_type, is_active=False, status=0):
"""更新进程状态"""
try:
if monitor_type == 'cpu':
HighCPUProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
elif monitor_type == 'memory':
HighMemoryProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
elif monitor_type == 'gpu':
HighGPUProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
except Exception as e:
logger.error(f"更新进程状态失败: {str(e)}")
def index(request):
"""渲染主页"""
return render(request, 'index.html')
@csrf_exempt
@require_http_methods(["POST"])
def stop_auto_detect(request):
"""停止自动检测的API端点"""
try:
return JsonResponse({
'status': 'success',
'message': '已停止自动检测'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"停止自动检测失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'停止自动检测失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_latest_log(request, pid):
"""获取最新的监控日志"""
try:
monitor_type = request.GET.get('type') # 获取监控类型cpu/gpu/memory
if not monitor_type:
return JsonResponse({
'status': 'error',
'message': '请指定监控类型'
})
# 根据类型获取对应的监控记录
monitor = None
if monitor_type == 'cpu':
monitor = HighCPUProcess.objects.filter(pid=pid, is_active=True).first()
elif monitor_type == 'gpu':
monitor = HighGPUProcess.objects.filter(pid=pid, is_active=True).first()
elif monitor_type == 'memory':
monitor = HighMemoryProcess.objects.filter(pid=pid, is_active=True).first()
if not monitor:
return JsonResponse({
'status': 'error',
'message': f'未找到进程 {pid}{monitor_type}监控记录'
})
# 获取最新数据
try:
process = psutil.Process(pid)
with process.oneshot():
data = {
'pid': pid,
'name': process.name(),
'status': 1, # 1表示进程运行中
}
# 根据监控类型添加相应的数据
if monitor_type == 'cpu':
data['cpu_percent'] = process.cpu_percent()
elif monitor_type == 'memory':
memory_info = process.memory_info()
data['memory_gb'] = memory_info.rss / (1024 * 1024 * 1024)
data['memory_percent'] = process.memory_percent()
elif monitor_type == 'gpu':
gpu_usage, gpu_memory = get_process_gpu_usage(pid)
data['gpu_usage'] = gpu_usage
data['gpu_memory'] = gpu_memory
return JsonResponse({
'status': 'success',
'data': data
})
except psutil.NoSuchProcess:
# 进程已终止
monitor.is_active = False
monitor.status = 0
monitor.save()
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 已终止'
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
})
@csrf_exempt
@require_http_methods(["GET"])
def get_process_list(request):
"""获取当前监控的进程列表"""
try:
processes = []
# 获取所有活跃的监控进程
cpu_processes = HighCPUProcess.objects.filter(is_active=True)
gpu_processes = HighGPUProcess.objects.filter(is_active=True)
memory_processes = HighMemoryProcess.objects.filter(is_active=True)
# 创建进程信息字典
process_dict = {}
# 处理 CPU 进程
for proc in cpu_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': proc.cpu_usage,
'memory_gb': 0,
'memory_percent': 0,
'gpu_usage': 0,
'gpu_memory': 0,
'resource_types': ['cpu']
}
else:
process_dict[proc.pid]['cpu_percent'] = proc.cpu_usage
process_dict[proc.pid]['resource_types'].append('cpu')
# 处理 GPU 进程
for proc in gpu_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': 0,
'memory_gb': 0,
'memory_percent': 0,
'gpu_usage': proc.gpu_usage,
'gpu_memory': proc.gpu_memory,
'resource_types': ['gpu']
}
else:
process_dict[proc.pid]['gpu_usage'] = proc.gpu_usage
process_dict[proc.pid]['gpu_memory'] = proc.gpu_memory
process_dict[proc.pid]['resource_types'].append('gpu')
# 处理内存进程
for proc in memory_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': 0,
'memory_gb': proc.memory_usage,
'memory_percent': proc.memory_percent,
'gpu_usage': 0,
'gpu_memory': 0,
'resource_types': ['memory']
}
else:
process_dict[proc.pid]['memory_gb'] = proc.memory_usage
process_dict[proc.pid]['memory_percent'] = proc.memory_percent
process_dict[proc.pid]['resource_types'].append('memory')
# 转换字典为列表
processes = list(process_dict.values())
# 按总资源占用率排序
processes.sort(key=lambda x: (
x['cpu_percent'] / 100 +
x['memory_percent'] / 100 +
x['gpu_usage'] / 100
), reverse=True)
return JsonResponse({
'status': 'success',
'processes': processes
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
}, json_dumps_params={'ensure_ascii': False})
def high_resource_list(request):
"""高资源进程列表视图"""
context = {
'cpu_processes': HighCPUProcess.objects.all().order_by('-updated_at'),
'memory_processes': HighMemoryProcess.objects.all().order_by('-updated_at'),
'gpu_processes': HighGPUProcess.objects.all().order_by('-updated_at'),
}
return render(request, 'high_resource_list.html', context)
def find_python_processes_in_dir(directory):
"""查找指定目录下运行的 Python 进程"""
python_processes = []
directory = str(Path(directory).resolve()) # 转换为绝对路径,并处理跨平台差异
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cwd']):
try:
# 检查是否是 Python 进程
if proc.info['name'].lower().startswith('python'):
# 检查进程的工作目录
proc_cwd = str(Path(proc.info['cwd']).resolve())
if proc_cwd.startswith(directory):
# 检查命令行参数
cmdline = proc.info['cmdline']
if cmdline:
python_processes.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cmdline': ' '.join(cmdline),
'cwd': proc_cwd
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return python_processes
def get_gpu_info(pid):
"""获取进程的GPU使用信息"""
try:
import pynvml
pynvml.nvmlInit()
gpu_info = {
'usage': 0,
'memory': 0,
'device_count': pynvml.nvmlDeviceGetCount()
}
for i in range(gpu_info['device_count']):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
procs = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)
for proc in procs:
if proc.pid == pid:
gpu_info['memory'] = proc.usedGpuMemory / 1024 / 1024 # 转换为MB
gpu_info['usage'] = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu
break
return gpu_info
except Exception as e:
logger.error(f"获取GPU信息失败: {str(e)}")
return {'usage': 0, 'memory': 0, 'device_count': 0}
def get_io_counters(proc):
"""获取进程的IO计数器信息"""
try:
io = proc.io_counters()
return {
'read_bytes': io.read_bytes / 1024 / 1024, # 转换为MB
'write_bytes': io.write_bytes / 1024 / 1024,
'read_count': io.read_count,
'write_count': io.write_count
}
except:
return {'read_bytes': 0, 'write_bytes': 0, 'read_count': 0, 'write_count': 0}
def get_network_connections(proc):
"""获取进程的网络连接信息"""
try:
connections = []
for conn in proc.connections():
connections.append({
'local_address': f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "",
'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else "",
'status': conn.status
})
return connections
except:
return []
def get_open_files(proc):
"""获取进程打开的文件列表"""
try:
return [f.path for f in proc.open_files()]
except:
return []
@csrf_exempt
@require_http_methods(["POST"])
def fetch_tiktok_videos(request):
"""获取TikTok用户播放量前10的视频"""
try:
# 添加全局变量引用
global all_downloaded_videos
# 如果变量未初始化,则初始化为空列表
if 'all_downloaded_videos' not in globals():
all_downloaded_videos = []
data = json.loads(request.body)
unique_id = data.get('unique_id')
if not unique_id:
return JsonResponse({
'status': 'error',
'message': '请提供TikTok用户ID(unique_id)'
}, json_dumps_params={'ensure_ascii': False})
# 调用API获取用户资料和secUid
logger.info(f"正在获取用户 {unique_id} 的资料...")
user_profile = fetch_user_profile(unique_id)
if not user_profile or 'data' not in user_profile:
return JsonResponse({
'status': 'error',
'message': f'无法获取用户 {unique_id} 的资料'
}, json_dumps_params={'ensure_ascii': False})
# 从API响应中提取secUid和其他用户信息
try:
user_info = user_profile['data']['userInfo']['user']
sec_uid = user_info['secUid']
# 提取其他用户信息
nickname = user_info.get('nickname', f'用户_{unique_id}')
signature = user_info.get('signature', '')
avatar_url = user_info.get('avatarLarger', '')
user_stats = user_profile['data']['userInfo']['stats']
follower_count = user_stats.get('followerCount', 0)
heart_count = user_stats.get('heartCount', 0)
video_count = user_stats.get('videoCount', 0)
logger.info(f"成功获取用户secUid: {sec_uid}, 该用户有 {video_count} 个视频")
except (KeyError, TypeError) as e:
logger.error(f"解析用户资料出错: {e}")
return JsonResponse({
'status': 'error',
'message': f'解析用户资料出错: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
# 确保用户目录存在
user_dir = os.path.join(TIKTOK_VIDEOS_PATH, unique_id)
os.makedirs(user_dir, exist_ok=True)
# 使用辅助函数获取播放量前10的视频
logger.info(f"开始获取用户 {nickname} 的全部视频并查找播放量前10...")
top_videos = get_top_n_videos_by_play_count(sec_uid, n=10)
# 下载这些视频
downloaded_videos = []
for i, (play_count, video_id, desc) in enumerate(top_videos, 1):
try:
save_path = os.path.join(user_dir, f"{video_id}.mp4")
logger.info(f"下载第 {i}/{len(top_videos)} 个视频 (ID: {video_id}),播放量: {play_count}")
if download_video(video_id, unique_id, save_path):
video_info = {
'id': video_id,
'desc': desc,
'play_count': play_count,
'user_unique_id': unique_id
}
downloaded_videos.append(video_info)
logger.info(f"视频 {video_id} 下载成功")
else:
logger.error(f"视频 {video_id} 下载失败")
# 避免过快请求
time.sleep(2)
except Exception as e:
logger.error(f"下载视频时出错: {e}")
continue
all_downloaded_videos.extend(downloaded_videos)
# 保存用户信息和视频信息到数据库
video_info_json = json.dumps([{
'id': v['id'],
'desc': v['desc'],
'play_count': v['play_count']
} for v in downloaded_videos], ensure_ascii=False)
user_record = TiktokUserVideos.objects.update_or_create(
sec_user_id=sec_uid,
defaults={
'nickname': nickname,
'signature': signature,
'follower_count': follower_count,
'total_favorited': heart_count,
'avatar_url': avatar_url,
'videos_folder': user_dir,
'video_paths': video_info_json,
'video_count': video_count
}
)
return JsonResponse({
'status': 'success',
'message': '处理完成',
'user_info': {
'nickname': nickname,
'unique_id': unique_id,
'sec_uid': sec_uid,
'avatar': avatar_url,
'follower_count': follower_count,
'total_favorited': heart_count,
'signature': signature,
'video_count': video_count
},
'total_videos_count': video_count,
'processed_videos_count': len(top_videos),
'downloaded_videos': len(downloaded_videos),
'videos': [{'id': v['id'], 'desc': v['desc'][:50], 'play_count': v['play_count']} for v in downloaded_videos],
'video_directory': user_dir # 添加视频目录路径方便查找
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"处理TikTok视频失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'status': 'error',
'message': f'处理TikTok视频失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@require_http_methods(["GET"])
def get_tiktok_user_videos(request):
"""获取已下载的TikTok用户视频列表"""
try:
sec_user_id = request.GET.get('sec_user_id')
if not sec_user_id:
# 如果没有指定用户ID返回所有用户列表
users = TiktokUserVideos.objects.all().values('sec_user_id', 'nickname', 'follower_count', 'videos_folder', 'create_time')
return JsonResponse({
'status': 'success',
'users': list(users)
}, json_dumps_params={'ensure_ascii': False})
# 查询指定用户信息
try:
user = TiktokUserVideos.objects.get(sec_user_id=sec_user_id)
# 解析视频信息JSON
video_info = json.loads(user.video_paths) if user.video_paths else []
# 获取文件夹中的文件列表
videos_folder = user.videos_folder
video_files = []
if os.path.exists(videos_folder):
video_files = [f for f in os.listdir(videos_folder) if os.path.isfile(os.path.join(videos_folder, f))]
except TiktokUserVideos.DoesNotExist:
return JsonResponse({
'status': 'error',
'message': f'用户 {sec_user_id} 不存在'
}, json_dumps_params={'ensure_ascii': False})
return JsonResponse({
'status': 'success',
'user_info': {
'sec_user_id': user.sec_user_id,
'nickname': user.nickname,
'signature': user.signature,
'follower_count': user.follower_count,
'total_favorited': user.total_favorited,
'avatar_url': user.avatar_url,
'create_time': user.create_time.strftime('%Y-%m-%d %H:%M:%S'),
'update_time': user.update_time.strftime('%Y-%m-%d %H:%M:%S')
},
'videos_folder': videos_folder,
'video_files': video_files,
'video_info': video_info
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取TikTok视频列表失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'获取TikTok视频列表失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
# 辅助函数
def fetch_user_videos(sec_uid, cursor=0, count=10):
"""获取用户视频列表"""
url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_post?secUid={sec_uid}&cursor={cursor}&count={count}"
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
logger.info(f"成功获取用户视频,共 {len(data['data'].get('itemList', []))} 个视频")
return data
else:
logger.error(f"获取用户视频失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取用户视频异常: {e}")
return None
def fetch_user_profile(unique_id):
"""获取用户基本信息"""
url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_profile?uniqueId={unique_id}"
try:
logger.info(f"正在请求用户资料: {url}")
response = requests.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
logger.info(f"成功获取用户资料: {unique_id}")
# 打印完整响应以便调试
logger.info(f"API原始响应: {data}")
# 验证数据完整性
if 'data' not in data or not data['data']:
logger.error(f"API响应缺少data字段: {data}")
return None
if 'userInfo' not in data['data'] or not data['data']['userInfo']:
logger.error(f"API响应缺少userInfo字段: {data['data']}")
return None
if 'user' not in data['data']['userInfo'] or not data['data']['userInfo']['user']:
logger.error(f"API响应缺少user字段: {data['data']['userInfo']}")
return None
# 打印用户信息
logger.info(f"用户信息: {data['data']['userInfo']['user']}")
return data
else:
logger.error(f"获取用户信息失败: HTTP {response.status_code}, 响应: {response.text[:500]}")
return None
except Exception as e:
logger.error(f"获取用户信息异常: {e}")
return None
def download_video(video_id, unique_id, save_path):
"""使用API的直接下载接口下载TikTok视频"""
# 确保视频ID是纯数字
if not str(video_id).isdigit():
logger.error(f"无效的视频ID: {video_id},必须是纯数字")
return False
# 构建标准TikTok视频URL
tiktok_url = f"https://www.tiktok.com/@{unique_id}/video/{video_id}"
logger.info(f"构建的TikTok URL: {tiktok_url}")
# 构建完整的API请求URL
api_url = f"http://81.69.223.133:45268/api/download"
full_url = f"{api_url}?url={tiktok_url}&prefix=true&with_watermark=false"
logger.info(f"完整的API请求URL: {full_url}")
try:
# 直接使用完整URL发送请求
response = requests.get(full_url, stream=True, timeout=60)
# 检查响应状态
if response.status_code != 200:
logger.error(f"下载视频失败: {response.status_code} - {response.text[:200] if response.text else '无响应内容'}")
return False
# 获取内容类型
content_type = response.headers.get('Content-Type', '')
logger.info(f"响应内容类型: {content_type}")
# 保存文件
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
file_size = os.path.getsize(save_path)
logger.info(f"视频已下载到: {save_path},文件大小: {file_size}字节")
return True
except Exception as e:
logger.error(f"下载视频异常: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return False
def fetch_user_followings(sec_uid, max_cursor=0, min_cursor=0, page_size=30):
"""
获取用户的关注列表
Args:
sec_uid: 用户的安全ID
max_cursor: 保持为0
min_cursor: 分页游标,从上一次响应获取
page_size: 每页大小默认30条记录
Returns:
用户关注列表数据
"""
url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_follow?secUid={sec_uid}&count={page_size}&maxCursor={max_cursor}&minCursor={min_cursor}"
logger.info(f"请求关注列表URL: {url}")
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
logger.info(f"成功获取用户关注列表,共 {len(data['data'].get('userList', []))} 个用户, minCursor={data['data'].get('minCursor')}")
return data
else:
logger.error(f"获取用户关注列表失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取用户关注列表异常: {e}")
return None
def filter_users_by_followers(user_list, min_followers=5000, max_followers=50000):
"""筛选粉丝数在指定范围内的用户"""
filtered_users = []
for user_data in user_list:
try:
follower_count = user_data.get('stats', {}).get('followerCount', 0)
if min_followers <= follower_count <= max_followers:
filtered_users.append(user_data)
except Exception as e:
logger.error(f"筛选用户时出错: {e}")
return filtered_users
def get_top_n_videos_by_play_count(sec_uid, n=10):
"""
获取用户播放量前N的视频
Args:
sec_uid: 用户的安全ID
n: 需要获取的前n个视频默认为10
Returns:
按播放量降序排列的前n个视频列表格式为字典
"""
import heapq
top_videos_heap = [] # 小顶堆,元组格式: (播放量, 视频ID, 描述)
# 分页获取所有视频
cursor = 0
has_more = True
page = 1
total_videos = 0
try:
while has_more:
logger.info(f"获取第{page}页视频,游标: {cursor}")
videos_data = fetch_user_videos(sec_uid, cursor=cursor)
if not videos_data or 'data' not in videos_data:
logger.error("获取视频失败,中止获取")
break
videos = videos_data['data'].get('itemList', [])
if not videos:
logger.info("当前页没有视频,结束获取")
break
# 处理当前页的每个视频
for video in videos:
try:
video_id = video.get('id', '')
if not video_id:
continue
play_count = int(video.get('stats', {}).get('playCount', 0))
desc = video.get('desc', '')
# 维护前n个最高播放量的视频
if len(top_videos_heap) < n:
# 堆还未满,直接添加
heapq.heappush(top_videos_heap, (play_count, video_id, desc))
elif play_count > top_videos_heap[0][0]:
# 当前视频播放量比堆中最小的高,替换
heapq.heappushpop(top_videos_heap, (play_count, video_id, desc))
except Exception as e:
logger.error(f"处理视频信息出错: {e}")
total_videos += len(videos)
logger.info(f"已处理 {total_videos} 个视频,当前保存了 {len(top_videos_heap)} 个候选视频")
# 检查是否有更多页
has_more = videos_data['data'].get('hasMore', False)
if has_more:
cursor = videos_data['data'].get('cursor', 0)
page += 1
time.sleep(1)
# 将堆转换为字典列表而不是元组列表,并按播放量降序排序
top_videos_list = []
# 从堆中转换为临时列表并排序
temp_list = [(play_count, video_id, desc) for play_count, video_id, desc in top_videos_heap]
temp_list.sort(reverse=True) # 从高到低排序
# 将元组转换为字典
for play_count, video_id, desc in temp_list:
top_videos_list.append({
'id': video_id,
'desc': desc,
'play_count': play_count
})
logger.info(f"{total_videos} 个视频中找到播放量最高的 {len(top_videos_list)}")
return top_videos_list
except Exception as e:
logger.error(f"获取热门视频时发生错误: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return []
# 辅助函数,用于操作所有任务
def list_all_tasks():
"""获取所有任务列表"""
tasks = []
for key in redis_client.keys("task_status:*"):
task_id = key.decode().split(":", 1)[1]
task_data = get_task_status(task_id)
if task_data:
task_data['task_id'] = task_id
tasks.append(task_data)
return tasks
@shared_task
def async_fetch_videos_task(task_id, start_unique_id, max_depth=3, target_users=None, skip_user_profile=False, start_sec_uid=None):
"""异步执行视频获取任务"""
try:
# 更新任务状态为处理中
update_task_status(task_id, {
'status': 'processing',
'progress': 0
})
# 创建视频获取器实例
fetcher = TiktokVideoFetcher(
start_unique_id=start_unique_id,
max_depth=max_depth,
target_users=target_users,
skip_user_profile=skip_user_profile, # 传递测试模式
start_sec_uid=start_sec_uid # 传递临时secUid
)
# 添加进度回调
fetcher.progress_callback = lambda progress, message: update_task_progress(task_id, progress, message)
# 执行处理
if fetcher.initialize() and fetcher.process():
# 处理成功,保存结果
result = fetcher.get_result()
update_task_status(task_id, {
'status': 'completed',
'result': result,
'progress': 100
})
return True
else:
# 处理失败
update_task_status(task_id, {
'status': 'failed',
'error': fetcher.error_message
})
return False
except Exception as e:
# 处理异常
logger.error(f"异步任务执行失败: {str(e)}")
import traceback
logger.error(f"详细错误信息: {traceback.format_exc()}")
update_task_status(task_id, {
'status': 'failed',
'error': str(e)
})
return False
def update_task_progress(task_id, progress, message=None):
"""更新任务进度"""
updates = {'progress': progress}
if message:
updates['message'] = message
update_task_status(task_id, updates)
class TiktokVideoFetcher:
"""抖音视频获取器,支持递归获取用户关注的视频"""
def __init__(self, start_unique_id=None, start_sec_uid=None,
mode="by_users", target_users=None, max_depth=1,
skip_user_profile=False, max_videos_per_user=10):
"""
初始化抖音视频获取器
:param start_unique_id: 起始用户的uniqueId
:param start_sec_uid: 起始用户的secUid
:param mode: 模式,'by_users'(按用户数)或'by_depth'(按深度)
:param target_users: 目标用户数量仅在by_users模式下有效
:param max_depth: 最大层级深度仅在by_depth模式下有效
:param skip_user_profile: 是否跳过获取用户资料
:param max_videos_per_user: 每个用户最多获取的视频数量
"""
# 统一参数命名:确保使用 start_sec_uid
self.start_unique_id = start_unique_id
self.start_sec_uid = start_sec_uid # 注意这里保持为start_sec_uid
self.mode = mode
self.target_users = target_users
self.max_depth = max_depth
self.skip_user_profile = skip_user_profile
self.max_videos_per_user = max_videos_per_user or 10
# 状态变量
self.status = "created" # 状态: created, initializing, ready, processing, completed, failed
self.user_count = 0 # 已处理用户数量
self.video_count = 0 # 已下载视频数量
self.progress = 0 # 进度百分比
self.progress_message = "" # 进度消息
self.error_message = "" # 错误消息
# 队列和集合
self.user_queue = deque() # 待处理用户队列,元素为 (sec_uid, unique_id, depth)
self.processed_users = set() # 已处理的用户集合
# 创建目录
os.makedirs(TIKTOK_VIDEOS_PATH, exist_ok=True)
logger.info(f"初始化TiktokVideoFetcher: mode={mode}, max_depth={max_depth}, target_users={target_users}")
def initialize(self):
"""初始化处理,获取起始用户信息"""
try:
self.status = "initializing"
logger.info(f"开始初始化,起始用户: {self.start_unique_id or '使用secUid'}")
# 如果直接提供secUid则直接使用
if self.start_sec_uid:
logger.info(f"直接使用提供的secUid: {self.start_sec_uid}")
# 如果未提供unique_id使用secUid的一部分作为替代
display_id = self.start_unique_id or f"user_{self.start_sec_uid[:8]}"
# 简化队列元素只保留sec_uid、unique_id和深度
self.user_queue.append((self.start_sec_uid, display_id, 0))
self.status = "ready"
return True
# 如果只提供了uniqueId需要获取secUid
elif self.start_unique_id:
logger.info(f"通过uniqueId获取用户secUid: {self.start_unique_id}")
user_profile = fetch_user_profile(self.start_unique_id)
# 检查错误
if isinstance(user_profile, dict) and 'error' in user_profile:
error_message = user_profile['error']
self.error_message = f'获取用户资料出错: {error_message}'
self.status = "failed"
return False
if not user_profile or 'data' not in user_profile or not user_profile['data']:
self.error_message = f'无法获取用户资料或用户不存在: {self.start_unique_id}'
self.status = "failed"
return False
try:
user_data = user_profile['data']
sec_uid = user_data.get('secUid', '')
if not sec_uid:
self.error_message = f'无法获取用户secUid: {self.start_unique_id}'
self.status = "failed"
return False
# 添加到队列中深度为0
self.user_queue.append((sec_uid, self.start_unique_id, 0))
# 创建或更新用户数据库记录
try:
TiktokUserVideos.objects.update_or_create(
sec_user_id=sec_uid,
defaults={
'unique_id': self.start_unique_id,
'nickname': user_data.get('nickname', ''),
'follower_count': user_data.get('followerCount', 0),
'following_count': user_data.get('followingCount', 0),
'video_count': user_data.get('videoCount', 0),
'videos_folder': os.path.join('videos', self.start_unique_id)
}
)
except Exception as e:
logger.warning(f"保存用户数据到数据库失败,但不影响继续处理: {e}")
self.status = "ready"
return True
except KeyError:
self.error_message = f'用户资料结构不完整无法提取secUid'
self.status = "failed"
return False
# 两者都没提供
else:
self.error_message = "未提供secUid或uniqueId无法初始化"
self.status = "failed"
return False
except Exception as e:
logger.error(f"初始化失败: {e}")
import traceback
logger.error(f"详细错误信息: {traceback.format_exc()}")
self.error_message = f"初始化失败: {str(e)}"
self.status = "failed"
return False
def process(self):
"""执行主处理逻辑"""
if self.status != "ready":
if not self.initialize():
return False
self.status = "processing"
self._report_progress(5, "初始化完成,开始处理")
try:
# 预先估算总任务量
total_users = self.target_users if self.mode == "by_users" and self.target_users else 50
# 主处理循环
while self.user_queue:
# 按人数抓取模式下,达到目标人数则停止
if self.mode == "by_users" and self.target_users is not None and self.user_count >= self.target_users:
logger.info(f"已达到目标用户数 {self.target_users},停止处理")
break
# 队列中只有sec_uid、unique_id和depth
sec_uid, unique_id, depth = self.user_queue.popleft()
# 如果超过最大深度且是按层级模式,则跳过此用户
if self.mode == "by_depth" and depth > self.max_depth:
continue
# 如果用户已处理,跳过
if sec_uid in self.processed_users:
continue
# 处理单个用户
self._process_single_user(sec_uid, unique_id, depth)
# 报告进度
progress = min(95, int(self.user_count / total_users * 90) + 5)
self._report_progress(progress, f"已处理 {self.user_count} 个用户,已下载 {self.video_count} 个视频")
self.status = "completed"
self._report_progress(100, f"处理完成,共处理 {self.user_count} 个用户,下载 {self.video_count} 个视频")
return True
except Exception as e:
logger.error(f"处理过程中出错: {e}")
import traceback
logger.error(f"详细错误信息: {traceback.format_exc()}")
self.error_message = f"处理过程中出错: {str(e)}"
self.status = "failed"
self._report_progress(0, f"处理失败: {str(e)}")
return False
def _process_single_user(self, sec_uid, unique_id, depth):
"""处理单个用户的视频和关注列表 - 简化版"""
# 标记为已处理
self.processed_users.add(sec_uid)
self.user_count += 1
logger.info(f"开始处理用户 {unique_id},深度: {depth}")
try:
# 创建基本用户记录(如果不存在)
try:
TiktokUserVideos.objects.get_or_create(
sec_user_id=sec_uid,
defaults={
'unique_id': unique_id,
'videos_folder': os.path.join('videos', unique_id)
}
)
except Exception as e:
logger.error(f"创建基本用户记录失败: {e}")
# 获取并下载视频
downloaded_videos = self._fetch_user_videos(sec_uid, unique_id, depth)
# 处理用户的关注列表
if self.mode == "by_depth" and depth < self.max_depth:
next_depth = depth + 1
self._process_user_followings(sec_uid, next_depth)
elif self.mode == "by_users" and self.user_count < self.target_users:
next_depth = depth + 1
self._process_user_followings(sec_uid, next_depth)
# 报告进度
self._report_progress(
min(95, int(self.user_count / (self.target_users or 50) * 90) + 5),
f"已处理 {self.user_count} 个用户,已下载 {self.video_count} 个视频"
)
except Exception as e:
logger.error(f"处理用户 {unique_id} 时出错: {e}")
import traceback
logger.error(traceback.format_exc())
def _fetch_user_videos(self, sec_uid, unique_id, depth):
"""获取并下载用户视频,优先处理字典格式数据"""
# 导入traceback
import traceback
downloaded_videos = []
# 使用unique_id创建视频文件夹
videos_folder = os.path.join(TIKTOK_VIDEOS_PATH, unique_id)
os.makedirs(videos_folder, exist_ok=True)
logger.info(f"开始处理用户 {unique_id} 的视频")
try:
# 获取视频列表 - 现在统一返回字典列表
logger.info(f"调用 get_top_n_videos_by_play_count({sec_uid}, n={self.max_videos_per_user})")
top_videos = get_top_n_videos_by_play_count(sec_uid, n=self.max_videos_per_user)
# 记录返回值类型
logger.info(f"获取到视频列表: 类型={type(top_videos).__name__}, 数量={len(top_videos)}")
# 如果没有获取到视频,仅更新文件夹路径
if not top_videos:
logger.warning(f"未能获取到用户 {unique_id} 的任何视频")
try:
TiktokUserVideos.objects.filter(sec_user_id=sec_uid).update(
videos_folder=videos_folder
)
except Exception as e:
logger.error(f"更新用户视频文件夹失败: {e}")
logger.error(traceback.format_exc())
return []
# 处理每个视频 - 显示进度
total_videos = len(top_videos)
logger.info(f"开始处理 {total_videos} 个视频")
for i, video_data in enumerate(top_videos):
current_num = i + 1
progress_str = f"({total_videos}/{current_num})" # 进度显示格式:(总数/当前)
logger.info(f"{progress_str} 开始处理第 {current_num}/{total_videos} 个视频")
try:
# 视频信息提取 - 优先按字典处理,确保兼容性
if isinstance(video_data, dict):
video_id = str(video_data.get('id', ''))
play_count = video_data.get('play_count', 0)
desc = video_data.get('desc', '')
logger.info(f"{progress_str} 字典数据: ID={video_id}, 播放={play_count}")
else:
# 如果不是字典,尝试按元组处理(向后兼容)
logger.warning(f"{progress_str} 收到非字典数据: {type(video_data).__name__}")
if isinstance(video_data, tuple) and len(video_data) >= 2:
play_count, video_id, *rest = video_data
desc = rest[0] if rest else ""
video_id = str(video_id)
logger.info(f"{progress_str} 元组数据: ID={video_id}, 播放={play_count}")
else:
logger.error(f"{progress_str} 无法解析数据: {video_data}")
continue
# 视频ID必须存在
if not video_id:
logger.warning(f"{progress_str} 视频ID为空跳过")
continue
# 视频文件名
video_filename = f"{video_id}.mp4"
video_path = os.path.join(videos_folder, video_filename)
# 如果文件已存在且大小大于0则跳过下载
if os.path.exists(video_path) and os.path.getsize(video_path) > 0:
logger.info(f"{progress_str} 视频已存在: {video_id}")
# 创建字典并添加到列表 - 统一使用字典格式
video_dict = {
'id': video_id,
'desc': desc,
'play_count': play_count
}
downloaded_videos.append(video_dict)
continue
# 下载视频 - 显示下载进度
logger.info(f"{progress_str} 开始下载视频: {video_id}")
if download_video(video_id, unique_id, video_path):
logger.info(f"{progress_str} 视频下载成功: {video_id}")
self.video_count += 1
# 创建字典并添加到列表
video_dict = {
'id': video_id,
'desc': desc,
'play_count': play_count
}
downloaded_videos.append(video_dict)
else:
logger.error(f"{progress_str} 下载视频失败: {video_id}")
except Exception as e:
logger.error(f"{progress_str} 处理单个视频时出错: {e}")
logger.error(traceback.format_exc())
continue
# 避免过快请求
logger.info(f"{progress_str} 处理完成等待2秒...")
time.sleep(2)
# 更新数据库 - 只更新模型中实际存在的字段
try:
TiktokUserVideos.objects.filter(sec_user_id=sec_uid).update(
videos_folder=videos_folder
)
# 记录视频信息,但不保存到数据库
video_info_json = json.dumps(downloaded_videos, ensure_ascii=False)
logger.info(f"已处理 {len(downloaded_videos)}/{total_videos} 个视频")
logger.info(f"数据库更新成功: 更新了 videos_folder={videos_folder}")
except Exception as e:
logger.error(f"更新数据库失败: {e}")
logger.error(traceback.format_exc())
except Exception as e:
logger.error(f"处理用户 {unique_id} 视频时发生错误: {str(e)}")
logger.error(traceback.format_exc())
return downloaded_videos
def _process_user_followings(self, sec_uid, next_depth):
"""处理用户的关注列表添加符合粉丝数条件的前5个用户到队列中"""
try:
logger.info(f"获取用户关注列表: {sec_uid}, 深度: {next_depth}")
max_cursor = 0 # maxCursor保持为0
min_cursor = 0 # 初始minCursor为0
max_time = time.time()
filtered_users_count = 0 # 已筛选出的符合条件的用户数量
max_filtered_users = 5 # 只获取前5个符合条件的用户
processed_sec_uids = set() # 用于去重
page = 1 # 页码计数
while True:
# 如果已经找到足够的用户,停止获取
if filtered_users_count >= max_filtered_users:
logger.info(f"已找到 {max_filtered_users} 个符合条件的用户,停止获取")
break
# 获取关注列表,使用正确的游标参数
logger.info(f"获取关注列表第{page}页: maxCursor={max_cursor}, minCursor={min_cursor}")
followings_data = fetch_user_followings(sec_uid, max_cursor=max_cursor, min_cursor=min_cursor, page_size=30)
if not followings_data or 'data' not in followings_data:
logger.warning(f"无法获取用户关注列表: {sec_uid}")
break
# 获取用户列表
user_list = followings_data['data'].get('userList', [])
if not user_list:
logger.info("没有更多关注用户")
break
logger.info(f"获取到 {len(user_list)} 个关注用户")
# 使用filter_users_by_followers函数筛选符合粉丝数条件的用户
filtered_users = filter_users_by_followers(user_list)
logger.info(f"本页筛选出 {len(filtered_users)} 个粉丝数符合条件的用户")
# 处理筛选后的用户
for following in filtered_users:
# 如果已经找到足够的用户,停止处理
if filtered_users_count >= max_filtered_users:
break
# 获取用户信息
user_data = following.get('user', {})
stats_data = following.get('stats', {})
follower_sec_uid = user_data.get('secUid')
follower_unique_id = user_data.get('uniqueId')
# 检查必要字段
if not follower_sec_uid or not follower_unique_id:
logger.warning("用户数据不完整,缺少必要字段")
continue
# 检查是否已处理过该用户
if follower_sec_uid in processed_sec_uids or follower_sec_uid in self.processed_users:
continue
# 添加到已处理集合
processed_sec_uids.add(follower_sec_uid)
filtered_users_count += 1
# 添加到队列
self.user_queue.append((follower_sec_uid, follower_unique_id, next_depth))
logger.info(f"添加符合条件的用户到队列: {follower_unique_id}, 粉丝数: {stats_data.get('followerCount', 0)}")
# 保存到数据库
try:
TiktokUserVideos.objects.update_or_create(
sec_user_id=follower_sec_uid,
defaults={
'unique_id': follower_unique_id,
'nickname': user_data.get('nickname', ''),
'follower_count': stats_data.get('followerCount', 0),
'following_count': stats_data.get('followingCount', 0),
'video_count': stats_data.get('videoCount', 0)
}
)
except Exception as e:
logger.error(f"保存关注用户到数据库失败: {follower_unique_id}, {e}")
# 检查是否已找到足够的用户
if filtered_users_count >= max_filtered_users:
break
# 更新游标处理
old_min_cursor = min_cursor
min_cursor = followings_data['data'].get('minCursor', 0)
has_more = followings_data['data'].get('hasMore', False)
# 记录游标更新
logger.info(f"游标更新: 旧min_cursor={old_min_cursor} -> 新min_cursor={min_cursor}, has_more={has_more}")
# 检查游标是否有效和是否有更多数据
if not has_more or min_cursor == old_min_cursor or not min_cursor:
logger.info("没有更多数据或游标无效,结束获取")
break
# 防止过度请求
logger.info("等待1秒后获取下一页...")
time.sleep(1)
# 增加页码
page += 1
# 时间限制
if time.time() - max_time > 60: # 最多获取1分钟
logger.warning(f"达到时间限制,停止获取更多关注用户")
break
logger.info(f"用户关注处理完成,共筛选出 {filtered_users_count} 个符合条件的用户")
except Exception as e:
logger.error(f"处理用户关注列表出错: {e}")
import traceback
logger.error(traceback.format_exc())
def _report_progress(self, progress, message):
"""报告处理进度"""
self.progress = progress
self.progress_message = message
logger.info(f"进度更新: {progress}%, {message}")
def get_progress(self):
"""获取当前进度信息"""
return {
'status': self.status,
'progress': self.progress,
'message': self.progress_message,
'error': self.error_message,
'user_count': self.user_count,
'video_count': self.video_count
}
def get_result(self):
"""获取处理结果,包括统计信息"""
return {
'status': self.status,
'processed_users': self.user_count,
'downloaded_videos': self.video_count,
'error': self.error_message,
'completed_at': datetime.now().isoformat()
}
@csrf_exempt
@require_http_methods(["POST"])
def start_recursive_fetch_videos(request):
"""启动异步视频获取任务"""
try:
# 导入traceback
import traceback
data = json.loads(request.body)
start_unique_id = data.get('start_unique_id')
start_sec_uid = data.get('start_sec_uid') # 保持参数一致性
# 如果提供了sec_uid但没提供start_sec_uid使用sec_uid
if not start_sec_uid and data.get('sec_uid'):
start_sec_uid = data.get('sec_uid')
logger.info(f"使用sec_uid作为start_sec_uid: {start_sec_uid}")
mode = data.get('mode', 'by_users')
max_depth = int(data.get('max_depth', 3))
skip_user_profile = data.get('skip_user_profile', False)
max_videos_per_user = int(data.get('max_videos_per_user', 10))
# 检查必要参数
if not start_unique_id and not start_sec_uid:
return JsonResponse({
'code': 400,
'message': '参数错误',
'error': '请提供起始用户ID(start_unique_id)或用户secUid(start_sec_uid)'
}, json_dumps_params={'ensure_ascii': False})
# 确保目标用户数是整数
target_users = None
if 'target_users' in data and data['target_users']:
try:
target_users = int(data['target_users'])
except (ValueError, TypeError):
return JsonResponse({
'code': 400,
'message': '参数错误',
'error': 'target_users必须是整数'
}, json_dumps_params={'ensure_ascii': False})
# 生成任务ID
task_id = str(uuid.uuid4())
# 记录任务信息
logger.info(f"========================= 任务启动 =========================")
logger.info(f"任务ID: {task_id}")
logger.info(f"起始用户: {start_unique_id or '使用secUid'}")
logger.info(f"起始secUid: {start_sec_uid}")
logger.info(f"模式: {mode}")
logger.info(f"最大深度: {max_depth}")
logger.info(f"目标用户数: {target_users if target_users else '不限'}")
logger.info(f"跳过用户资料: {skip_user_profile}")
logger.info(f"每用户最大视频数: {max_videos_per_user}")
logger.info(f"========================================================")
# 创建任务参数字典
task_params = {
'start_unique_id': start_unique_id,
'start_sec_uid': start_sec_uid, # 保持一致的参数命名
'mode': mode,
'target_users': target_users,
'max_depth': max_depth,
'skip_user_profile': skip_user_profile,
'max_videos_per_user': max_videos_per_user
}
# 初始化任务状态
set_task_status(task_id, {
'status': 'pending',
'progress': 0,
'message': '任务已提交,等待处理',
'result': None,
'error': None,
'created_at': datetime.now().isoformat(),
'params': task_params
})
# 启动异步任务 - 统一使用start_sec_uid参数名
async_fetch_videos_task.delay(
task_id,
start_unique_id,
max_depth,
target_users,
skip_user_profile,
start_sec_uid # 参数名保持一致
)
# 返回任务ID
return JsonResponse({
'code': 200,
'message': '任务已提交',
'data': {
'task_id': task_id,
'status': 'pending'
}
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"启动异步任务失败: {str(e)}")
import traceback
logger.error(f"详细错误信息: {traceback.format_exc()}")
return JsonResponse({
'code': 500,
'message': '服务器错误',
'error': f'启动异步任务失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def check_fetch_task_status(request, task_id):
"""检查任务状态"""
task_info = get_task_status(task_id)
if not task_info:
return JsonResponse({
'status': 'error',
'message': '任务不存在'
}, json_dumps_params={'ensure_ascii': False})
# 构建响应数据
response_data = {
'status': task_info.get('status', 'unknown'),
'progress': task_info.get('progress', 0),
'message': task_info.get('message', ''),
'error': task_info.get('error'),
'created_at': task_info.get('created_at', '')
}
return JsonResponse(response_data, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_fetch_task_result(request, task_id):
"""获取任务完整结果"""
task_info = get_task_status(task_id)
if not task_info:
return JsonResponse({
'status': 'error',
'message': '任务不存在'
}, json_dumps_params={'ensure_ascii': False})
if task_info['status'] != 'completed':
return JsonResponse({
'status': 'error',
'message': f'任务尚未完成,当前状态: {task_info["status"]}'
}, json_dumps_params={'ensure_ascii': False})
# 返回完整结果
return JsonResponse(task_info['result'], json_dumps_params={'ensure_ascii': False})
# 任务管理API
@csrf_exempt
@require_http_methods(["GET"])
def list_fetch_tasks(request):
"""列出所有任务"""
tasks = []
all_tasks = list_all_tasks()
for task_info in all_tasks:
tasks.append({
'task_id': task_info.get('task_id'),
'status': task_info.get('status'),
'progress': task_info.get('progress', 0),
'message': task_info.get('message', ''),
'created_at': task_info.get('created_at'),
'unique_id': task_info.get('params', {}).get('unique_id') if 'params' in task_info else None
})
return JsonResponse({
'tasks': sorted(tasks, key=lambda x: x['created_at'] if 'created_at' in x else '', reverse=True)
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["DELETE"])
def clean_completed_tasks(request):
"""清理已完成的任务"""
completed_count = 0
all_tasks = list_all_tasks()
for task_info in all_tasks:
task_id = task_info.get('task_id')
if task_info.get('status') in ['completed', 'failed']:
if 'created_at' in task_info and \
datetime.fromisoformat(task_info['created_at']) < datetime.now() - timedelta(days=1):
redis_client.delete(f"task_status:{task_id}")
completed_count += 1
return JsonResponse({
'status': 'success',
'message': f'已清理 {completed_count} 个完成超过24小时的任务'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["POST"])
def fetch_videos_with_callback(request):
"""启动任务并设置回调URL任务完成后自动发送结果"""
try:
data = json.loads(request.body)
callback_url = data.get('callback_url')
if not callback_url:
return JsonResponse({
'status': 'error',
'message': '必须提供callback_url参数'
}, json_dumps_params={'ensure_ascii': False})
task_id = str(uuid.uuid4())
# 使用Redis存储任务状态
set_task_status(task_id, {
'status': 'pending',
'progress': 0,
'message': '任务已创建,等待处理',
'error': None,
'callback_url': callback_url,
'params': data,
'created_at': datetime.now().isoformat()
})
# 启动任务
fetch_and_callback_task.delay(task_id, data)
return JsonResponse({
'status': 'accepted',
'message': '任务已启动,完成后将回调通知',
'task_id': task_id
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"启动任务失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'启动任务失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@shared_task
def fetch_and_callback_task(task_id, data):
"""执行视频获取并回调通知"""
try:
# 更新任务状态为处理中
update_task_status(task_id, {
'status': 'processing',
'message': '正在处理中'
})
# 在任务开始时关闭可能存在的旧连接
from django.db import connections
for conn in connections.all():
conn.close()
fetcher = TiktokVideoFetcher(
start_unique_id=data.get('unique_id'),
max_depth=int(data.get('max_depth', 3)),
target_users=int(data.get('target_users')) if 'target_users' in data and data['target_users'] else None
)
if fetcher.initialize() and fetcher.process():
result = fetcher.get_result()
update_task_status(task_id, {
'status': 'completed',
'result': result
})
else:
update_task_status(task_id, {
'status': 'failed',
'error': fetcher.error_message
})
result = {
'status': 'error',
'message': fetcher.error_message
}
# 发送回调通知
callback_url = get_task_status(task_id)['callback_url']
try:
response = requests.post(callback_url, json={
'task_id': task_id,
'result': result
}, timeout=30)
update_task_status(task_id, {
'callback_status': response.status_code
})
except Exception as e:
logger.error(f"回调通知失败: {str(e)}")
update_task_status(task_id, {
'callback_status': 'failed',
'callback_error': str(e)
})
# 尝试发送错误回调
try:
callback_url = get_task_status(task_id)['callback_url']
requests.post(callback_url, json={
'task_id': task_id,
'status': 'error',
'message': str(e)
}, timeout=30)
except:
pass
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
update_task_status(task_id, {
'status': 'failed',
'error': str(e)
})
# 尝试发送错误回调
try:
callback_url = get_task_status(task_id)['callback_url']
requests.post(callback_url, json={
'task_id': task_id,
'status': 'error',
'message': str(e)
}, timeout=30)
except:
pass
@csrf_exempt
@require_http_methods(["POST"])
def reset_task_status(request, task_id):
"""手动重置任务状态(临时解决方案)"""
try:
task_info = get_task_status(task_id)
if not task_info:
return JsonResponse({
'status': 'error',
'message': '任务不存在'
}, json_dumps_params={'ensure_ascii': False})
# 重置任务状态
set_task_status(task_id, {
'status': 'reset',
'message': '任务已手动重置',
'reset_time': datetime.now().isoformat(),
'original_status': task_info.get('status', 'unknown')
})
return JsonResponse({
'status': 'success',
'message': f'任务 {task_id} 已重置'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': f'重置失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def check_task_status_detail(request, task_id):
"""获取任务详细状态信息(包含调试数据)"""
task_info = get_task_status(task_id)
if not task_info:
return JsonResponse({
'status': 'error',
'message': '任务不存在'
}, json_dumps_params={'ensure_ascii': False})
# 添加系统诊断信息
try:
import celery.app.control
from automated_task_monitor.celery import app # 替换为您的Celery应用实例
# 获取活跃Worker
inspector = app.control.inspect()
active_workers = inspector.active()
active_tasks = inspector.active()
task_info['debug'] = {
'active_workers': bool(active_workers),
'worker_count': len(active_workers) if active_workers else 0,
'active_tasks': active_tasks,
'server_time': datetime.now().isoformat()
}
except Exception as e:
task_info['debug'] = {
'error': str(e),
'server_time': datetime.now().isoformat()
}
return JsonResponse(task_info, json_dumps_params={'ensure_ascii': False})