automated_task_monitor/monitor/views.py

1612 lines
65 KiB
Python
Raw Normal View History

from django.http import JsonResponse
from .tasks import monitor_process, get_process_gpu_usage
import threading
import psutil
from .models import HighCPUProcess, HighGPUProcess, HighMemoryProcess, AllResourceProcess, TiktokUserVideos
import logging
import os
from datetime import datetime
import time
import nvidia_smi
from django.utils import timezone
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render
import pytz
from pathlib import Path # 使用 pathlib 处理跨平台路径
import json
from django.conf import settings
import requests
import threading
directory_monitoring = {}
# 全局变量来控制检测线程
monitor_thread = None
is_monitoring = False
# 在文件开头定义日志目录
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(BASE_DIR, 'logs', 'process_monitor')
# 创建保存视频的基本路径
TIKTOK_VIDEOS_PATH = os.path.join(BASE_DIR, 'media', 'tiktok_videos')
# 确保基本目录存在
os.makedirs(TIKTOK_VIDEOS_PATH, exist_ok=True)
# 确保基础目录结构存在,添加 'all' 目录
for resource_type in ['cpu', 'memory', 'gpu', 'all']:
os.makedirs(os.path.join(LOG_DIR, resource_type), exist_ok=True)
# 获取应用专属的logger
logger = logging.getLogger('monitor')
# 全局变量来跟踪监控的目录
monitored_directories = set()
# 在文件顶部添加 API 基础 URL
API_BASE_URL = "http://81.69.223.133:45268"
# 添加新的监控线程函数
def monitor_directory(directory):
"""持续监控目录的后台任务"""
monitor_interval = settings.MONITOR_INTERVAL # 监控间隔5秒
log_interval = settings.LOG_INTERVAL # 日志写入间隔60秒
processed_pids = set() # 用于跟踪已处理的PID
last_log_time = {} # 每个进程的最后日志时间
while directory_monitoring.get(directory, False):
try:
next_update = timezone.now() + timezone.timedelta(seconds=monitor_interval)
processes = find_python_processes_in_dir(directory)
for proc in processes:
pid = proc['pid']
try:
# 设置日志文件
log_file = setup_process_logger(pid, 'all')
if log_file:
process = psutil.Process(pid)
data = get_process_data(process)
# 更新数据库
AllResourceProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': float(data['cpu']['usage'].replace('%', '')),
'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')),
'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')),
'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()),
'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()),
'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()),
'gpu_usage': float(data['gpu']['usage'].replace('%', '')),
'gpu_memory': float(data['gpu']['memory'].replace('MB', '')),
'net_io_sent': float(data['io']['write'].split('MB')[0].strip()),
'net_io_recv': float(data['io']['read'].split('MB')[0].strip()),
'is_active': True,
'status': 1,
'log_file': log_file
}
)
now = timezone.now()
# 如果是新检测到的进程,立即写入日志
if pid not in processed_pids:
print(f"首次检测到进程,立即写入日志: PID={pid}") # 调试日志
log_process_metrics(pid, data, log_file)
processed_pids.add(pid)
last_log_time[pid] = now
# 对于已知进程,每隔一分钟写入一次
elif (now - last_log_time.get(pid, now)).total_seconds() >= log_interval:
print(f"定期写入日志: PID={pid}") # 调试日志
log_process_metrics(pid, data, log_file)
last_log_time[pid] = now
except Exception as e:
logger.error(f"监控进程 {pid} 失败: {str(e)}")
# 计算需要等待的时间
now = timezone.now()
if next_update > now:
time.sleep((next_update - now).total_seconds())
except Exception as e:
logger.error(f"目录监控出错: {str(e)}")
time.sleep(5)
def get_python_processes(directory):
"""获取指定目录下的Python进程"""
directory = str(Path(directory).resolve()).lower()
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if 'python' in proc.info['name'].lower():
process = psutil.Process(proc.info['pid'])
try:
proc_cwd = str(Path(process.cwd()).resolve()).lower()
if directory in proc_cwd or proc_cwd.startswith(directory):
processes.append(process)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return processes
def get_process_info(process):
"""获取进程详细信息"""
try:
with process.oneshot():
info = {
'pid': process.pid,
'name': process.name(),
'cpu_usage': process.cpu_percent(),
'memory_usage': process.memory_info().rss / (1024 * 1024), # MB
'command_line': ' '.join(process.cmdline()),
'create_time': process.create_time(),
'status': process.status(),
'threads': process.num_threads(),
'working_directory': process.cwd(),
'gpu_info': {
'usage': 0,
'memory': 0
}
}
# 获取IO信息
try:
io = process.io_counters()
info['io'] = {
'read_bytes': io.read_bytes / (1024 * 1024), # MB
'write_bytes': io.write_bytes / (1024 * 1024), # MB
'read_count': io.read_count,
'write_count': io.write_count
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
info['io'] = {
'read_bytes': 0,
'write_bytes': 0,
'read_count': 0,
'write_count': 0
}
return info
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.error(f"获取进程 {process.pid} 信息失败: {str(e)}")
return None
@csrf_exempt
@require_http_methods(["POST"])
def scan_directory(request):
"""扫描目录下的 Python 进程"""
try:
data = json.loads(request.body)
directory = data.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '请提供目录路径'
})
# 添加到监控目录集合
directory = str(Path(directory).resolve())
monitored_directories.add(directory)
logger.info(f"开始监控目录: {directory}")
return JsonResponse({
'status': 'success',
'message': f'开始监控目录: {directory}',
'directory': directory
})
except Exception as e:
logger.error(f"启动目录监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
@require_http_methods(["GET"])
def get_directory_status(request):
"""获取目录下的进程状态"""
try:
directory = request.GET.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '未提供目录路径'
})
# 获取目录下的Python进程
processes = get_python_processes(directory)
# 获取每个进程的详细信息
process_info = []
for proc in processes:
info = get_process_info(proc)
if info:
process_info.append(info)
return JsonResponse({
'status': 'success',
'processes': process_info,
'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
logger.error(f"获取目录状态失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
@require_http_methods(["POST"])
def stop_directory_monitor(request):
"""停止目录监控"""
try:
data = json.loads(request.body)
directory = data.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '未提供目录路径'
})
# 从监控集合中移除
directory = str(Path(directory).resolve())
if directory in monitored_directories:
monitored_directories.remove(directory)
logger.info(f"停止监控目录: {directory}")
return JsonResponse({
'status': 'success',
'message': '监控已停止'
})
except Exception as e:
logger.error(f"停止监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
def setup_process_logger(pid, monitor_type):
"""为进程设置独立的日志记录器"""
try:
# 验证监控类型
if monitor_type not in ['cpu', 'memory', 'gpu', 'all']:
print(f"警告:无效的监控类型 {monitor_type}")
return None
# 获取当前时间(使用本地时区)
local_tz = pytz.timezone('Asia/Shanghai')
current_time = timezone.localtime(timezone.now(), local_tz)
current_date = current_time.strftime("%Y%m%d")
# 如果是从目录扫描启动的监控,使用 'all' 类型
if monitor_type == 'scan':
monitor_type = 'all'
# 构建日志文件路径
log_dir = os.path.join(LOG_DIR, monitor_type)
log_file = os.path.join(log_dir, f'{pid}_{current_date}_{monitor_type}.log')
# 确保目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
print(f"设置日志文件: {log_file}") # 调试信息
return log_file
except Exception as e:
print(f"设置日志文件失败: {str(e)}")
raise
def log_process_metrics(pid, data, log_file):
"""记录进程指标到日志文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 获取当前时间(使用本地时区)
local_tz = pytz.timezone('Asia/Shanghai') # 使用中国时区
current_time = timezone.localtime(timezone.now(), local_tz)
timestamp = current_time.strftime('%Y-%m-%d %H:%M:%S')
# 格式化日志内容
log_content = f"=== {timestamp} ===\n"
# CPU信息
log_content += "CPU信息:\n"
log_content += f"├─ 使用率: {data['cpu']['usage']}\n"
log_content += f"├─ 用户态时间: {data['cpu']['user_time']}\n"
log_content += f"├─ 内核态时间: {data['cpu']['system_time']}\n"
log_content += f"├─ CPU核心数: {data['cpu']['cores']}\n"
log_content += f"├─ CPU频率: {data['cpu']['frequency']}\n"
log_content += f"└─ 上下文切换: {data['cpu']['context_switches']}\n"
# 内存信息
log_content += "内存信息:\n"
log_content += f"├─ 物理内存: {data['memory']['physical']}\n"
log_content += f"├─ 虚拟内存: {data['memory']['virtual']}\n"
log_content += f"├─ 内存映射: {data['memory']['mappings']}\n"
log_content += f"├─ 系统内存使用: {data['memory']['system_usage']}\n"
log_content += f"└─ 交换空间使用: {data['memory']['swap_usage']}\n"
# GPU信息
log_content += "GPU信息:\n"
log_content += f"├─ 使用率: {data['gpu']['usage']}\n"
log_content += f"└─ 显存使用: {data['gpu']['memory']}\n"
# IO信息
log_content += "IO信息:\n"
log_content += f"├─ 读取: {data['io']['read']}\n"
log_content += f"├─ 写入: {data['io']['write']}\n"
log_content += f"└─ 其他IO: {data['io']['other']}\n"
# 进程状态
log_content += f"进程状态: {data['status']}\n"
log_content += "-" * 50 + "\n"
# 写入日志文件
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_content)
print(f"已写入日志到: {log_file}") # 调试信息
except Exception as e:
print(f"写入日志失败: {str(e)}")
raise
def get_process_by_name(process_name):
"""根据进程名称获取进程PID"""
pids = []
for proc in psutil.process_iter(['pid', 'name']):
try:
if process_name.lower() in proc.info['name'].lower():
pids.append(proc.info['pid'])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pids
def get_process_gpu_usage(pid):
"""获取进程的GPU使用情况"""
try:
# 如果没有GPU或无法获取GPU信息返回默认值
return 0.0, 0.0
except Exception as e:
logger.error(f"获取GPU信息失败: {str(e)}")
return 0.0, 0.0
def get_high_resource_processes():
"""获取高资源占用的进程"""
high_resource_pids = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
if proc.info['cpu_percent'] > 50 or proc.info['memory_percent'] > 50:
high_resource_pids.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cpu_percent': proc.info['cpu_percent'],
'memory_percent': proc.info['memory_percent']
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return high_resource_pids
def auto_detect_high_resource_processes(request):
"""自动检测高资源进程的API端点"""
try:
# 获取高资源进程
high_resource_procs = get_high_resource_processes()
# 启动监控
for proc in high_resource_procs:
try:
process = psutil.Process(proc['pid'])
# 设置日志文件
log_file = setup_process_logger(proc['pid'], 'auto')
# 记录进程数据
data = get_process_data(process)
log_process_metrics(proc['pid'], data, log_file)
except psutil.NoSuchProcess:
continue
except Exception as e:
logger.error(f"监控进程 {proc['pid']} 时出错: {str(e)}")
return JsonResponse({
'status': 'success',
'message': '已开始自动检测高资源进程',
'processes': high_resource_procs
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"自动检测失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'自动检测失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
def setup_logger(pid):
"""为每个进程设置独立的日志记录器"""
log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'process_monitor', f'process_{pid}_{datetime.now().strftime("%Y%m%d")}.log')
process_logger = logging.getLogger(f'monitor.process.process_{pid}')
process_logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
process_logger.addHandler(handler)
return process_logger, log_file
def continuous_monitor():
"""持续监控高资源进程的线程函数"""
global is_monitoring
logger = logging.getLogger('monitor')
interval = 60 # 设置为60秒间隔
while is_monitoring:
try:
next_update = timezone.now() + timezone.timedelta(seconds=interval)
# 获取所有活跃的监控记录
monitors = {
'cpu': HighCPUProcess.objects.filter(is_active=True),
'memory': HighMemoryProcess.objects.filter(is_active=True),
'gpu': HighGPUProcess.objects.filter(is_active=True)
}
for monitor_type, processes in monitors.items():
for proc in processes:
try:
process = psutil.Process(proc.pid)
data = get_process_data(process)
log_file = setup_process_logger(proc.pid, monitor_type)
log_process_metrics(proc.pid, data, log_file)
except psutil.NoSuchProcess:
proc.is_active = False
proc.save()
except Exception as e:
logger.error(f"监控进程 {proc.pid} 时出错: {str(e)}")
# 计算需要等待的时间
now = timezone.now()
if next_update > now:
time.sleep((next_update - now).total_seconds())
except Exception as e:
logger.error(f"持续监控出错: {str(e)}")
time.sleep(5)
def get_process_data(process):
"""获取进程的详细数据"""
with process.oneshot():
cpu_times = process.cpu_times()
cpu_freq = psutil.cpu_freq() or psutil._common.scpufreq(current=0, min=0, max=0)
cpu_ctx = process.num_ctx_switches()
mem = process.memory_info()
try:
mem_maps = len(process.memory_maps())
except (psutil.AccessDenied, psutil.TimeoutExpired):
mem_maps = 0
sys_mem = psutil.virtual_memory()
swap = psutil.swap_memory()
try:
io = process.io_counters()
except (psutil.AccessDenied, psutil.TimeoutExpired):
io = psutil._common.pio(read_count=0, write_count=0, read_bytes=0, write_bytes=0,
read_chars=0, write_chars=0, other_count=0, other_bytes=0)
gpu_usage, gpu_memory = get_process_gpu_usage(process.pid)
return {
'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
'cpu': {
'usage': f"{process.cpu_percent()}%",
'user_time': f"{cpu_times.user:.1f}s",
'system_time': f"{cpu_times.system:.1f}s",
'cores': str(psutil.cpu_count()),
'frequency': f"{getattr(cpu_freq, 'current', 0):.1f}MHz",
'context_switches': f"{cpu_ctx.voluntary}/{cpu_ctx.involuntary}"
},
'memory': {
'physical': f"{mem.rss/1024/1024:.1f}MB ({process.memory_percent():.1f}%)",
'virtual': f"{mem.vms/1024/1024:.1f}MB",
'mappings': f"{mem_maps}",
'system_usage': f"{sys_mem.percent:.1f}%",
'swap_usage': f"{swap.percent:.1f}%"
},
'gpu': {
'usage': f"{gpu_usage:.1f}%",
'memory': f"{gpu_memory:.1f}MB"
},
'io': {
'read': f"{getattr(io, 'read_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'read_count', 0)}次)",
'write': f"{getattr(io, 'write_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'write_count', 0)}次)",
'other': f"{getattr(io, 'other_count', 0)}"
},
'status': process.status()
}
@csrf_exempt
@require_http_methods(["GET", "POST"])
def start_monitor(request):
"""开始监控进程"""
global monitor_thread, is_monitoring
try:
pid = request.GET.get('pid')
monitor_type = request.GET.get('type')
if not pid:
return JsonResponse({
'status': 'error',
'message': '请输入进程ID'
}, json_dumps_params={'ensure_ascii': False})
pid = int(pid)
try:
process = psutil.Process(pid)
process_name = process.name()
# 启动监控线程
if not is_monitoring:
is_monitoring = True
monitor_thread = threading.Thread(target=continuous_monitor)
monitor_thread.daemon = True
monitor_thread.start()
return JsonResponse({
'status': 'success',
'message': f'开始监控进程 {pid} ({process_name})'
}, json_dumps_params={'ensure_ascii': False})
except psutil.NoSuchProcess:
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 不存在'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"启动监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'启动监控失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET", "POST"])
def stop_monitor(request):
"""停止监控进程"""
global is_monitoring
try:
pid = request.GET.get('pid')
if not pid:
return JsonResponse({
'status': 'error',
'message': '请输入进程ID'
}, json_dumps_params={'ensure_ascii': False})
pid = int(pid)
# 停止监控线程
is_monitoring = False
return JsonResponse({
'status': 'success',
'message': f'停止监控进程 {pid}'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"停止监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'停止监控失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@require_http_methods(["GET"])
def get_process_status(request, pid):
"""获取进程监控状态的API"""
try:
monitor_type = request.GET.get('type')
logger.info(f"获取进程状态: pid={pid}, type={monitor_type}")
# 先设置日志文件路径
log_file = setup_process_logger(pid, monitor_type)
if not log_file:
raise ValueError(f"无法创建日志文件,监控类型: {monitor_type}")
process = psutil.Process(pid)
data = get_process_data(process)
# 同步数据到数据库
try:
# 更新 all 资源记录
AllResourceProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': float(data['cpu']['usage'].replace('%', '')),
'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')),
'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')),
'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()),
'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()),
'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()),
'gpu_usage': float(data['gpu']['usage'].replace('%', '')),
'gpu_memory': float(data['gpu']['memory'].replace('MB', '')),
'net_io_sent': float(data['io']['write'].split('MB')[0].strip()),
'net_io_recv': float(data['io']['read'].split('MB')[0].strip()),
'is_active': True,
'status': 1,
'log_file': log_file
}
)
# 根据监控类型选择对应的模型
if monitor_type == 'cpu':
# 从字符串中提取CPU使用率数值
cpu_usage = float(data['cpu']['usage'].replace('%', ''))
logger.info(f"CPU使用率: {cpu_usage}%") # 调试日志
process_obj, created = HighCPUProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': cpu_usage,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
logger.info(f"{'创建' if created else '更新'}CPU进程记录: {process_obj}")
elif monitor_type == 'memory':
# 从字符串中提取内存使用量和百分比
memory_info = data['memory']['physical']
memory_usage = float(memory_info.split('MB')[0].strip())
memory_percent = float(memory_info.split('(')[1].split('%')[0].strip())
HighMemoryProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'memory_usage': memory_usage,
'memory_percent': memory_percent,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
elif monitor_type == 'gpu':
# 从字符串中提取GPU使用率和显存
gpu_usage = float(data['gpu']['usage'].replace('%', ''))
gpu_memory = float(data['gpu']['memory'].replace('MB', ''))
HighGPUProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'gpu_usage': gpu_usage,
'gpu_memory': gpu_memory,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
return JsonResponse({
'status': 'success',
'data': data
})
except psutil.NoSuchProcess:
# 进程已终止
monitor.is_active = False
monitor.status = 0
monitor.save()
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 已终止'
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
})
def update_process_status(pid, monitor_type, is_active=False, status=0):
"""更新进程状态"""
try:
if monitor_type == 'cpu':
HighCPUProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
elif monitor_type == 'memory':
HighMemoryProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
elif monitor_type == 'gpu':
HighGPUProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
except Exception as e:
logger.error(f"更新进程状态失败: {str(e)}")
def index(request):
"""渲染主页"""
return render(request, 'index.html')
@csrf_exempt
@require_http_methods(["POST"])
def stop_auto_detect(request):
"""停止自动检测的API端点"""
try:
return JsonResponse({
'status': 'success',
'message': '已停止自动检测'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"停止自动检测失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'停止自动检测失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_latest_log(request, pid):
"""获取最新的监控日志"""
try:
monitor_type = request.GET.get('type') # 获取监控类型cpu/gpu/memory
if not monitor_type:
return JsonResponse({
'status': 'error',
'message': '请指定监控类型'
})
# 根据类型获取对应的监控记录
monitor = None
if monitor_type == 'cpu':
monitor = HighCPUProcess.objects.filter(pid=pid, is_active=True).first()
elif monitor_type == 'gpu':
monitor = HighGPUProcess.objects.filter(pid=pid, is_active=True).first()
elif monitor_type == 'memory':
monitor = HighMemoryProcess.objects.filter(pid=pid, is_active=True).first()
if not monitor:
return JsonResponse({
'status': 'error',
'message': f'未找到进程 {pid}{monitor_type}监控记录'
})
# 获取最新数据
try:
process = psutil.Process(pid)
with process.oneshot():
data = {
'pid': pid,
'name': process.name(),
'status': 1, # 1表示进程运行中
}
# 根据监控类型添加相应的数据
if monitor_type == 'cpu':
data['cpu_percent'] = process.cpu_percent()
elif monitor_type == 'memory':
memory_info = process.memory_info()
data['memory_gb'] = memory_info.rss / (1024 * 1024 * 1024)
data['memory_percent'] = process.memory_percent()
elif monitor_type == 'gpu':
gpu_usage, gpu_memory = get_process_gpu_usage(pid)
data['gpu_usage'] = gpu_usage
data['gpu_memory'] = gpu_memory
return JsonResponse({
'status': 'success',
'data': data
})
except psutil.NoSuchProcess:
# 进程已终止
monitor.is_active = False
monitor.status = 0
monitor.save()
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 已终止'
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
})
@csrf_exempt
@require_http_methods(["GET"])
def get_process_list(request):
"""获取当前监控的进程列表"""
try:
processes = []
# 获取所有活跃的监控进程
cpu_processes = HighCPUProcess.objects.filter(is_active=True)
gpu_processes = HighGPUProcess.objects.filter(is_active=True)
memory_processes = HighMemoryProcess.objects.filter(is_active=True)
# 创建进程信息字典
process_dict = {}
# 处理 CPU 进程
for proc in cpu_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': proc.cpu_usage,
'memory_gb': 0,
'memory_percent': 0,
'gpu_usage': 0,
'gpu_memory': 0,
'resource_types': ['cpu']
}
else:
process_dict[proc.pid]['cpu_percent'] = proc.cpu_usage
process_dict[proc.pid]['resource_types'].append('cpu')
# 处理 GPU 进程
for proc in gpu_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': 0,
'memory_gb': 0,
'memory_percent': 0,
'gpu_usage': proc.gpu_usage,
'gpu_memory': proc.gpu_memory,
'resource_types': ['gpu']
}
else:
process_dict[proc.pid]['gpu_usage'] = proc.gpu_usage
process_dict[proc.pid]['gpu_memory'] = proc.gpu_memory
process_dict[proc.pid]['resource_types'].append('gpu')
# 处理内存进程
for proc in memory_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': 0,
'memory_gb': proc.memory_usage,
'memory_percent': proc.memory_percent,
'gpu_usage': 0,
'gpu_memory': 0,
'resource_types': ['memory']
}
else:
process_dict[proc.pid]['memory_gb'] = proc.memory_usage
process_dict[proc.pid]['memory_percent'] = proc.memory_percent
process_dict[proc.pid]['resource_types'].append('memory')
# 转换字典为列表
processes = list(process_dict.values())
# 按总资源占用率排序
processes.sort(key=lambda x: (
x['cpu_percent'] / 100 +
x['memory_percent'] / 100 +
x['gpu_usage'] / 100
), reverse=True)
return JsonResponse({
'status': 'success',
'processes': processes
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
}, json_dumps_params={'ensure_ascii': False})
def high_resource_list(request):
"""高资源进程列表视图"""
context = {
'cpu_processes': HighCPUProcess.objects.all().order_by('-updated_at'),
'memory_processes': HighMemoryProcess.objects.all().order_by('-updated_at'),
'gpu_processes': HighGPUProcess.objects.all().order_by('-updated_at'),
}
return render(request, 'high_resource_list.html', context)
def find_python_processes_in_dir(directory):
"""查找指定目录下运行的 Python 进程"""
python_processes = []
directory = str(Path(directory).resolve()) # 转换为绝对路径,并处理跨平台差异
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cwd']):
try:
# 检查是否是 Python 进程
if proc.info['name'].lower().startswith('python'):
# 检查进程的工作目录
proc_cwd = str(Path(proc.info['cwd']).resolve())
if proc_cwd.startswith(directory):
# 检查命令行参数
cmdline = proc.info['cmdline']
if cmdline:
python_processes.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cmdline': ' '.join(cmdline),
'cwd': proc_cwd
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return python_processes
def get_gpu_info(pid):
"""获取进程的GPU使用信息"""
try:
import pynvml
pynvml.nvmlInit()
gpu_info = {
'usage': 0,
'memory': 0,
'device_count': pynvml.nvmlDeviceGetCount()
}
for i in range(gpu_info['device_count']):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
procs = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)
for proc in procs:
if proc.pid == pid:
gpu_info['memory'] = proc.usedGpuMemory / 1024 / 1024 # 转换为MB
gpu_info['usage'] = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu
break
return gpu_info
except Exception as e:
logger.error(f"获取GPU信息失败: {str(e)}")
return {'usage': 0, 'memory': 0, 'device_count': 0}
def get_io_counters(proc):
"""获取进程的IO计数器信息"""
try:
io = proc.io_counters()
return {
'read_bytes': io.read_bytes / 1024 / 1024, # 转换为MB
'write_bytes': io.write_bytes / 1024 / 1024,
'read_count': io.read_count,
'write_count': io.write_count
}
except:
return {'read_bytes': 0, 'write_bytes': 0, 'read_count': 0, 'write_count': 0}
def get_network_connections(proc):
"""获取进程的网络连接信息"""
try:
connections = []
for conn in proc.connections():
connections.append({
'local_address': f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "",
'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else "",
'status': conn.status
})
return connections
except:
return []
def get_open_files(proc):
"""获取进程打开的文件列表"""
try:
return [f.path for f in proc.open_files()]
except:
return []
@csrf_exempt
@require_http_methods(["POST"])
def fetch_tiktok_videos(request):
"""获取TikTok用户视频并下载播放量前十的视频"""
try:
data = json.loads(request.body)
unique_id = data.get('unique_id')
if not unique_id:
return JsonResponse({
'status': 'error',
'message': '请提供TikTok用户ID(unique_id)'
}, json_dumps_params={'ensure_ascii': False})
# 调用API获取用户资料和secUid
logger.info(f"正在获取用户 {unique_id} 的资料...")
user_profile = fetch_user_profile(unique_id)
if not user_profile or 'data' not in user_profile:
return JsonResponse({
'status': 'error',
'message': f'无法获取用户 {unique_id} 的资料'
}, json_dumps_params={'ensure_ascii': False})
# 从API响应中提取secUid和其他用户信息
try:
user_info = user_profile['data']['userInfo']['user']
sec_uid = user_info['secUid']
# 提取其他用户信息
nickname = user_info.get('nickname', f'用户_{unique_id}')
signature = user_info.get('signature', '')
avatar_url = user_info.get('avatarLarger', '')
user_stats = user_profile['data']['userInfo'].get('stats', {})
follower_count = user_stats.get('followerCount', 0)
heart_count = user_stats.get('heartCount', 0)
logger.info(f"成功获取用户secUid: {sec_uid}")
except (KeyError, TypeError) as e:
logger.error(f"解析用户资料出错: {e}")
return JsonResponse({
'status': 'error',
'message': f'解析用户资料出错: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
# 确保用户目录存在
user_dir = os.path.join(TIKTOK_VIDEOS_PATH, unique_id)
os.makedirs(user_dir, exist_ok=True)
# 获取用户视频
videos_data = fetch_user_videos(sec_uid)
# 提取视频信息并按播放量排序
all_videos = []
if videos_data and isinstance(videos_data, dict) and 'data' in videos_data and 'itemList' in videos_data['data']:
for video in videos_data['data']['itemList']:
try:
# 确保提取正确的视频ID
video_id = video.get('id', '')
if not video_id or not str(video_id).isdigit():
logger.warning(f"跳过无效的视频ID: {video_id}")
continue
# 安全地获取统计数据
stats = video.get('stats', {})
if not isinstance(stats, dict):
stats = {}
play_count = int(stats.get('playCount', 0))
# 收集视频信息
all_videos.append({
'id': video_id,
'desc': video.get('desc', ''),
'create_time': video.get('createTime', 0),
'cover': video.get('cover', ''),
'play_count': play_count,
'comment_count': int(stats.get('commentCount', 0)),
'digg_count': int(stats.get('diggCount', 0)),
'share_count': int(stats.get('shareCount', 0))
})
except Exception as e:
logger.error(f"处理视频数据出错: {str(e)}")
continue
# 按播放量排序并获取前10个
all_videos.sort(key=lambda x: x['play_count'], reverse=True)
top_videos = all_videos[:10]
# 下载视频
downloaded_videos = []
for i, video in enumerate(top_videos):
# 获取视频ID
video_id = video['id'] # 这是数字ID
# 构建保存路径
save_path = os.path.join(user_dir, f"{video_id}.mp4")
logger.info(f"开始下载第 {i+1} 个热门视频(ID: {video_id}): {video['desc'][:20]}...")
# 使用正确的ID调用下载函数
if download_video(video_id, unique_id, save_path):
video['download_path'] = save_path
downloaded_videos.append(video)
logger.info(f"下载成功: {save_path}")
else:
logger.error(f"下载失败: {video_id}")
# 避免频繁请求被限制
time.sleep(1)
# 保存用户信息和视频信息到数据库
video_info_json = json.dumps([{
'id': v['id'],
'desc': v['desc'],
'play_count': v['play_count']
} for v in downloaded_videos], ensure_ascii=False)
user_record = TiktokUserVideos.objects.update_or_create(
sec_user_id=sec_uid,
defaults={
'nickname': nickname,
'signature': signature,
'follower_count': follower_count,
'total_favorited': heart_count,
'avatar_url': avatar_url,
'videos_folder': user_dir,
'video_paths': video_info_json
}
)
return JsonResponse({
'status': 'success',
'message': '处理完成',
'user_info': {
'nickname': nickname,
'unique_id': unique_id,
'sec_uid': sec_uid,
'avatar': avatar_url,
'follower_count': follower_count,
'total_favorited': heart_count,
'signature': signature
},
'videos_count': len(videos_data['data']['itemList']) if videos_data and 'data' in videos_data and 'itemList' in videos_data['data'] else 0,
'downloaded_videos': len(downloaded_videos),
'videos': [{'id': v['id'], 'desc': v['desc'][:50], 'play_count': v['play_count']} for v in downloaded_videos],
'video_directory': user_dir # 添加视频目录路径方便查找
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"处理TikTok视频失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'status': 'error',
'message': f'处理TikTok视频失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@require_http_methods(["GET"])
def get_tiktok_user_videos(request):
"""获取已下载的TikTok用户视频列表"""
try:
sec_user_id = request.GET.get('sec_user_id')
if not sec_user_id:
# 如果没有指定用户ID返回所有用户列表
users = TiktokUserVideos.objects.all().values('sec_user_id', 'nickname', 'follower_count', 'videos_folder', 'create_time')
return JsonResponse({
'status': 'success',
'users': list(users)
}, json_dumps_params={'ensure_ascii': False})
# 查询指定用户信息
try:
user = TiktokUserVideos.objects.get(sec_user_id=sec_user_id)
# 解析视频信息JSON
video_info = json.loads(user.video_paths) if user.video_paths else []
# 获取文件夹中的文件列表
videos_folder = user.videos_folder
video_files = []
if os.path.exists(videos_folder):
video_files = [f for f in os.listdir(videos_folder) if os.path.isfile(os.path.join(videos_folder, f))]
except TiktokUserVideos.DoesNotExist:
return JsonResponse({
'status': 'error',
'message': f'用户 {sec_user_id} 不存在'
}, json_dumps_params={'ensure_ascii': False})
return JsonResponse({
'status': 'success',
'user_info': {
'sec_user_id': user.sec_user_id,
'nickname': user.nickname,
'signature': user.signature,
'follower_count': user.follower_count,
'total_favorited': user.total_favorited,
'avatar_url': user.avatar_url,
'create_time': user.create_time.strftime('%Y-%m-%d %H:%M:%S'),
'update_time': user.update_time.strftime('%Y-%m-%d %H:%M:%S')
},
'videos_folder': videos_folder,
'video_files': video_files,
'video_info': video_info
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取TikTok视频列表失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'获取TikTok视频列表失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
# 辅助函数
def fetch_user_videos(sec_uid, max_cursor=0, count=20):
"""获取用户视频列表"""
url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_post?secUid={sec_uid}"
try:
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
logger.error(f"获取视频列表失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取视频列表异常: {e}")
return None
def fetch_user_profile(unique_id):
"""获取用户基本信息"""
# 添加User-Agent头模拟浏览器访问
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://www.tiktok.com/'
}
url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_profile?uniqueId={unique_id}"
try:
logger.info(f"正在请求用户资料: {url}")
# 添加重试机制
for attempt in range(3):
try:
response = requests.get(url, headers=headers, timeout=30)
break
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
logger.warning(f"请求超时或连接错误,尝试第{attempt+1}次重试: {e}")
if attempt == 2: # 最后一次重试失败
raise
time.sleep(2) # 等待2秒后重试
if response.status_code == 200:
try:
data = response.json()
logger.info(f"获取用户资料成功: {unique_id}")
# 打印完整响应以便调试
logger.info(f"API原始响应: {data}")
# 验证数据完整性
if 'data' not in data or not data['data']:
logger.error(f"API响应缺少data字段: {data}")
return None
if 'userInfo' not in data['data'] or not data['data']['userInfo']:
logger.error(f"API响应缺少userInfo字段: {data['data']}")
return None
if 'user' not in data['data']['userInfo'] or not data['data']['userInfo']['user']:
logger.error(f"API响应缺少user字段: {data['data']['userInfo']}")
return None
# 打印用户信息
logger.info(f"用户信息: {data['data']['userInfo']['user']}")
return data
except json.JSONDecodeError:
logger.error(f"API响应不是有效的JSON: {response.text[:500]}")
return None
else:
logger.error(f"获取用户信息失败: HTTP {response.status_code}, 响应: {response.text[:500]}")
return None
except Exception as e:
logger.error(f"获取用户信息异常: {e}")
return None
def download_video(video_id, unique_id, save_path):
"""使用API的直接下载接口下载TikTok视频"""
# 确保视频ID是纯数字
if not str(video_id).isdigit():
logger.error(f"无效的视频ID: {video_id},必须是纯数字")
return False
# 构建标准TikTok视频URL
tiktok_url = f"https://www.tiktok.com/@{unique_id}/video/{video_id}"
logger.info(f"构建的TikTok URL: {tiktok_url}")
# 构建完整的API请求URL
api_url = f"http://81.69.223.133:45268/api/download"
full_url = f"{api_url}?url={tiktok_url}&prefix=true&with_watermark=false"
logger.info(f"完整的API请求URL: {full_url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
try:
# 直接使用完整URL发送请求
response = requests.get(full_url, headers=headers, stream=True, timeout=60)
# 检查响应状态
if response.status_code != 200:
logger.error(f"下载视频失败: {response.status_code} - {response.text[:200] if response.text else '无响应内容'}")
return False
# 获取内容类型
content_type = response.headers.get('Content-Type', '')
logger.info(f"响应内容类型: {content_type}")
# 保存文件
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
file_size = os.path.getsize(save_path)
logger.info(f"视频已下载到: {save_path},文件大小: {file_size}字节")
return True
except Exception as e:
logger.error(f"下载视频异常: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return False
def fetch_user_followings(sec_uid):
"""获取用户关注列表"""
url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_follow?secUid={sec_uid}"
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
data = response.json()
logger.info(f"成功获取用户关注列表,共 {len(data['data'].get('userList', []))} 个关注")
return data
else:
logger.error(f"获取用户关注列表失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取用户关注列表异常: {e}")
return None
def filter_users_by_followers(user_list, min_followers=5000, max_followers=50000):
"""筛选粉丝数在指定范围内的用户"""
filtered_users = []
for user_data in user_list:
try:
follower_count = user_data.get('stats', {}).get('followerCount', 0)
if min_followers <= follower_count <= max_followers:
filtered_users.append(user_data)
except Exception as e:
logger.error(f"筛选用户时出错: {e}")
return filtered_users
@csrf_exempt
@require_http_methods(["POST"])
def recursive_fetch_videos(request):
"""递归获取关注列表中的用户视频"""
try:
data = json.loads(request.body)
start_unique_id = data.get('unique_id')
max_depth = int(data.get('max_depth', 3)) # 默认递归深度为3
if not start_unique_id:
return JsonResponse({
'status': 'error',
'message': '请提供起始TikTok用户ID(unique_id)'
}, json_dumps_params={'ensure_ascii': False})
# 获取起始用户资料和secUid
user_profile = fetch_user_profile(start_unique_id)
if not user_profile or 'data' not in user_profile:
return JsonResponse({
'status': 'error',
'message': f'无法获取用户 {start_unique_id} 的资料'
}, json_dumps_params={'ensure_ascii': False})
# 提取secUid和其他用户信息
try:
user_info = user_profile['data']['userInfo']['user']
start_sec_uid = user_info['secUid']
# 提取起始用户的详细信息
nickname = user_info.get('nickname', '')
signature = user_info.get('signature', '')
avatar_url = user_info.get('avatarLarger', '')
# 提取统计信息
stats = user_profile['data']['userInfo'].get('stats', {})
follower_count = stats.get('followerCount', 0)
following_count = stats.get('followingCount', 0)
heart_count = stats.get('heartCount', 0) or stats.get('diggCount', 0)
video_count = stats.get('videoCount', 0)
# 为起始用户创建目录
start_user_dir = os.path.join(TIKTOK_VIDEOS_PATH, start_unique_id)
os.makedirs(start_user_dir, exist_ok=True)
# 保存起始用户信息到数据库
TiktokUserVideos.objects.update_or_create(
sec_user_id=start_sec_uid,
defaults={
'nickname': nickname,
'signature': signature,
'follower_count': follower_count,
'following_count': following_count,
'total_favorited': heart_count,
'video_count': video_count,
'avatar_url': avatar_url,
'videos_folder': start_user_dir
}
)
logger.info(f"成功获取并保存起始用户信息: {start_unique_id}, secUid: {start_sec_uid}")
except (KeyError, TypeError) as e:
logger.error(f"解析用户资料出错: {e}")
return JsonResponse({
'status': 'error',
'message': f'解析用户资料出错: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
# 开始递归获取视频
all_downloaded_videos = []
processed_users = set() # 已处理的用户集合,避免重复处理
def process_user(sec_uid, unique_id, depth=0):
"""递归处理用户,获取视频和关注用户"""
if depth >= max_depth or sec_uid in processed_users:
return
processed_users.add(sec_uid)
logger.info(f"处理用户 {unique_id},递归深度: {depth}")
# 确保用户目录存在
user_dir = os.path.join(TIKTOK_VIDEOS_PATH, unique_id)
os.makedirs(user_dir, exist_ok=True)
# 下载该用户的热门视频
videos_data = fetch_user_videos(sec_uid)
all_videos = []
if videos_data and isinstance(videos_data, dict) and 'data' in videos_data and 'itemList' in videos_data['data']:
for video in videos_data['data']['itemList']:
try:
video_id = video.get('id', '')
if not video_id or not str(video_id).isdigit():
continue
stats = video.get('stats', {})
if not isinstance(stats, dict):
stats = {}
play_count = int(stats.get('playCount', 0))
all_videos.append({
'id': video_id,
'desc': video.get('desc', ''),
'play_count': play_count
})
except Exception as e:
logger.error(f"处理视频数据出错: {str(e)}")
continue
# 按播放量排序并获取前10个
all_videos.sort(key=lambda x: x['play_count'], reverse=True)
top_videos = all_videos[:10]
# 下载视频
downloaded_videos = []
for i, video in enumerate(top_videos):
video_id = video['id']
save_path = os.path.join(user_dir, f"{video_id}.mp4")
logger.info(f"下载用户 {unique_id} 的第 {i+1} 个热门视频: {video_id}")
if download_video(video_id, unique_id, save_path):
video['download_path'] = save_path
video['user_unique_id'] = unique_id
downloaded_videos.append(video)
all_downloaded_videos.append(video)
time.sleep(1) # 避免频繁请求
# 保存用户信息到数据库
video_info_json = json.dumps([{
'id': v['id'],
'desc': v['desc'],
'play_count': v['play_count']
} for v in downloaded_videos], ensure_ascii=False)
TiktokUserVideos.objects.update_or_create(
sec_user_id=sec_uid,
defaults={
'nickname': unique_id,
'videos_folder': user_dir,
'video_paths': video_info_json
}
)
# 获取关注列表
followings_data = fetch_user_followings(sec_uid)
if followings_data and 'data' in followings_data and 'userList' in followings_data['data']:
user_list = followings_data['data']['userList']
# 筛选粉丝数在5000-50000之间的用户
filtered_users = filter_users_by_followers(user_list, 5000, 50000)
logger.info(f"用户 {unique_id} 的关注列表中有 {len(filtered_users)} 个粉丝数在5000-50000之间")
# 取前5个用户
for user_data in filtered_users[:5]:
try:
# 直接从关注列表中提取用户信息
user_obj = user_data['user']
following_sec_uid = user_obj['secUid']
following_unique_id = user_obj['uniqueId']
# 获取用户详细信息
nickname = user_obj.get('nickname', '')
signature = user_obj.get('signature', '')
avatar_url = user_obj.get('avatarLarger', '')
# 获取统计信息
stats = user_data.get('stats', {})
follower_count = stats.get('followerCount', 0)
following_count = stats.get('followingCount', 0)
heart_count = stats.get('heartCount', 0)
video_count = stats.get('videoCount', 0)
# 保存用户信息到数据库(即使尚未下载视频)
follow_user_dir = os.path.join(TIKTOK_VIDEOS_PATH, following_unique_id)
TiktokUserVideos.objects.update_or_create(
sec_user_id=following_sec_uid,
defaults={
'nickname': nickname,
'signature': signature,
'follower_count': follower_count,
'following_count': following_count,
'total_favorited': heart_count,
'video_count': video_count,
'avatar_url': avatar_url,
'videos_folder': follow_user_dir
}
)
# 递归处理关注的用户
process_user(following_sec_uid, following_unique_id, depth + 1)
except Exception as e:
logger.error(f"处理关注用户时出错: {e}")
continue
# 开始递归处理
process_user(start_sec_uid, start_unique_id)
return JsonResponse({
'status': 'success',
'message': '递归获取视频完成',
'processed_users_count': len(processed_users),
'downloaded_videos_count': len(all_downloaded_videos),
'downloaded_videos': [{'id': v['id'], 'desc': v['desc'][:50], 'play_count': v['play_count'], 'user': v['user_unique_id']} for v in all_downloaded_videos[:100]] # 只返回前100个视频信息避免响应过大
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"递归获取TikTok视频失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'status': 'error',
'message': f'递归获取TikTok视频失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})