automated_task_monitor/monitor/views.py

1612 lines
65 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.http import JsonResponse
from .tasks import monitor_process, get_process_gpu_usage
import threading
import psutil
from .models import HighCPUProcess, HighGPUProcess, HighMemoryProcess, AllResourceProcess, TiktokUserVideos
import logging
import os
from datetime import datetime
import time
import nvidia_smi
from django.utils import timezone
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render
import pytz
from pathlib import Path # 使用 pathlib 处理跨平台路径
import json
from django.conf import settings
import requests
import threading
directory_monitoring = {}
# 全局变量来控制检测线程
monitor_thread = None
is_monitoring = False
# 在文件开头定义日志目录
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(BASE_DIR, 'logs', 'process_monitor')
# 创建保存视频的基本路径
TIKTOK_VIDEOS_PATH = os.path.join(BASE_DIR, 'media', 'tiktok_videos')
# 确保基本目录存在
os.makedirs(TIKTOK_VIDEOS_PATH, exist_ok=True)
# 确保基础目录结构存在,添加 'all' 目录
for resource_type in ['cpu', 'memory', 'gpu', 'all']:
os.makedirs(os.path.join(LOG_DIR, resource_type), exist_ok=True)
# 获取应用专属的logger
logger = logging.getLogger('monitor')
# 全局变量来跟踪监控的目录
monitored_directories = set()
# 在文件顶部添加 API 基础 URL
API_BASE_URL = "http://81.69.223.133:45268"
# 添加新的监控线程函数
def monitor_directory(directory):
"""持续监控目录的后台任务"""
monitor_interval = settings.MONITOR_INTERVAL # 监控间隔5秒
log_interval = settings.LOG_INTERVAL # 日志写入间隔60秒
processed_pids = set() # 用于跟踪已处理的PID
last_log_time = {} # 每个进程的最后日志时间
while directory_monitoring.get(directory, False):
try:
next_update = timezone.now() + timezone.timedelta(seconds=monitor_interval)
processes = find_python_processes_in_dir(directory)
for proc in processes:
pid = proc['pid']
try:
# 设置日志文件
log_file = setup_process_logger(pid, 'all')
if log_file:
process = psutil.Process(pid)
data = get_process_data(process)
# 更新数据库
AllResourceProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': float(data['cpu']['usage'].replace('%', '')),
'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')),
'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')),
'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()),
'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()),
'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()),
'gpu_usage': float(data['gpu']['usage'].replace('%', '')),
'gpu_memory': float(data['gpu']['memory'].replace('MB', '')),
'net_io_sent': float(data['io']['write'].split('MB')[0].strip()),
'net_io_recv': float(data['io']['read'].split('MB')[0].strip()),
'is_active': True,
'status': 1,
'log_file': log_file
}
)
now = timezone.now()
# 如果是新检测到的进程,立即写入日志
if pid not in processed_pids:
print(f"首次检测到进程,立即写入日志: PID={pid}") # 调试日志
log_process_metrics(pid, data, log_file)
processed_pids.add(pid)
last_log_time[pid] = now
# 对于已知进程,每隔一分钟写入一次
elif (now - last_log_time.get(pid, now)).total_seconds() >= log_interval:
print(f"定期写入日志: PID={pid}") # 调试日志
log_process_metrics(pid, data, log_file)
last_log_time[pid] = now
except Exception as e:
logger.error(f"监控进程 {pid} 失败: {str(e)}")
# 计算需要等待的时间
now = timezone.now()
if next_update > now:
time.sleep((next_update - now).total_seconds())
except Exception as e:
logger.error(f"目录监控出错: {str(e)}")
time.sleep(5)
def get_python_processes(directory):
"""获取指定目录下的Python进程"""
directory = str(Path(directory).resolve()).lower()
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if 'python' in proc.info['name'].lower():
process = psutil.Process(proc.info['pid'])
try:
proc_cwd = str(Path(process.cwd()).resolve()).lower()
if directory in proc_cwd or proc_cwd.startswith(directory):
processes.append(process)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return processes
def get_process_info(process):
"""获取进程详细信息"""
try:
with process.oneshot():
info = {
'pid': process.pid,
'name': process.name(),
'cpu_usage': process.cpu_percent(),
'memory_usage': process.memory_info().rss / (1024 * 1024), # MB
'command_line': ' '.join(process.cmdline()),
'create_time': process.create_time(),
'status': process.status(),
'threads': process.num_threads(),
'working_directory': process.cwd(),
'gpu_info': {
'usage': 0,
'memory': 0
}
}
# 获取IO信息
try:
io = process.io_counters()
info['io'] = {
'read_bytes': io.read_bytes / (1024 * 1024), # MB
'write_bytes': io.write_bytes / (1024 * 1024), # MB
'read_count': io.read_count,
'write_count': io.write_count
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
info['io'] = {
'read_bytes': 0,
'write_bytes': 0,
'read_count': 0,
'write_count': 0
}
return info
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.error(f"获取进程 {process.pid} 信息失败: {str(e)}")
return None
@csrf_exempt
@require_http_methods(["POST"])
def scan_directory(request):
"""扫描目录下的 Python 进程"""
try:
data = json.loads(request.body)
directory = data.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '请提供目录路径'
})
# 添加到监控目录集合
directory = str(Path(directory).resolve())
monitored_directories.add(directory)
logger.info(f"开始监控目录: {directory}")
return JsonResponse({
'status': 'success',
'message': f'开始监控目录: {directory}',
'directory': directory
})
except Exception as e:
logger.error(f"启动目录监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
@require_http_methods(["GET"])
def get_directory_status(request):
"""获取目录下的进程状态"""
try:
directory = request.GET.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '未提供目录路径'
})
# 获取目录下的Python进程
processes = get_python_processes(directory)
# 获取每个进程的详细信息
process_info = []
for proc in processes:
info = get_process_info(proc)
if info:
process_info.append(info)
return JsonResponse({
'status': 'success',
'processes': process_info,
'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
logger.error(f"获取目录状态失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
@require_http_methods(["POST"])
def stop_directory_monitor(request):
"""停止目录监控"""
try:
data = json.loads(request.body)
directory = data.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '未提供目录路径'
})
# 从监控集合中移除
directory = str(Path(directory).resolve())
if directory in monitored_directories:
monitored_directories.remove(directory)
logger.info(f"停止监控目录: {directory}")
return JsonResponse({
'status': 'success',
'message': '监控已停止'
})
except Exception as e:
logger.error(f"停止监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
def setup_process_logger(pid, monitor_type):
"""为进程设置独立的日志记录器"""
try:
# 验证监控类型
if monitor_type not in ['cpu', 'memory', 'gpu', 'all']:
print(f"警告:无效的监控类型 {monitor_type}")
return None
# 获取当前时间(使用本地时区)
local_tz = pytz.timezone('Asia/Shanghai')
current_time = timezone.localtime(timezone.now(), local_tz)
current_date = current_time.strftime("%Y%m%d")
# 如果是从目录扫描启动的监控,使用 'all' 类型
if monitor_type == 'scan':
monitor_type = 'all'
# 构建日志文件路径
log_dir = os.path.join(LOG_DIR, monitor_type)
log_file = os.path.join(log_dir, f'{pid}_{current_date}_{monitor_type}.log')
# 确保目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
print(f"设置日志文件: {log_file}") # 调试信息
return log_file
except Exception as e:
print(f"设置日志文件失败: {str(e)}")
raise
def log_process_metrics(pid, data, log_file):
"""记录进程指标到日志文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 获取当前时间(使用本地时区)
local_tz = pytz.timezone('Asia/Shanghai') # 使用中国时区
current_time = timezone.localtime(timezone.now(), local_tz)
timestamp = current_time.strftime('%Y-%m-%d %H:%M:%S')
# 格式化日志内容
log_content = f"=== {timestamp} ===\n"
# CPU信息
log_content += "CPU信息:\n"
log_content += f"├─ 使用率: {data['cpu']['usage']}\n"
log_content += f"├─ 用户态时间: {data['cpu']['user_time']}\n"
log_content += f"├─ 内核态时间: {data['cpu']['system_time']}\n"
log_content += f"├─ CPU核心数: {data['cpu']['cores']}\n"
log_content += f"├─ CPU频率: {data['cpu']['frequency']}\n"
log_content += f"└─ 上下文切换: {data['cpu']['context_switches']}\n"
# 内存信息
log_content += "内存信息:\n"
log_content += f"├─ 物理内存: {data['memory']['physical']}\n"
log_content += f"├─ 虚拟内存: {data['memory']['virtual']}\n"
log_content += f"├─ 内存映射: {data['memory']['mappings']}\n"
log_content += f"├─ 系统内存使用: {data['memory']['system_usage']}\n"
log_content += f"└─ 交换空间使用: {data['memory']['swap_usage']}\n"
# GPU信息
log_content += "GPU信息:\n"
log_content += f"├─ 使用率: {data['gpu']['usage']}\n"
log_content += f"└─ 显存使用: {data['gpu']['memory']}\n"
# IO信息
log_content += "IO信息:\n"
log_content += f"├─ 读取: {data['io']['read']}\n"
log_content += f"├─ 写入: {data['io']['write']}\n"
log_content += f"└─ 其他IO: {data['io']['other']}\n"
# 进程状态
log_content += f"进程状态: {data['status']}\n"
log_content += "-" * 50 + "\n"
# 写入日志文件
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_content)
print(f"已写入日志到: {log_file}") # 调试信息
except Exception as e:
print(f"写入日志失败: {str(e)}")
raise
def get_process_by_name(process_name):
"""根据进程名称获取进程PID"""
pids = []
for proc in psutil.process_iter(['pid', 'name']):
try:
if process_name.lower() in proc.info['name'].lower():
pids.append(proc.info['pid'])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pids
def get_process_gpu_usage(pid):
"""获取进程的GPU使用情况"""
try:
# 如果没有GPU或无法获取GPU信息返回默认值
return 0.0, 0.0
except Exception as e:
logger.error(f"获取GPU信息失败: {str(e)}")
return 0.0, 0.0
def get_high_resource_processes():
"""获取高资源占用的进程"""
high_resource_pids = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
if proc.info['cpu_percent'] > 50 or proc.info['memory_percent'] > 50:
high_resource_pids.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cpu_percent': proc.info['cpu_percent'],
'memory_percent': proc.info['memory_percent']
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return high_resource_pids
def auto_detect_high_resource_processes(request):
"""自动检测高资源进程的API端点"""
try:
# 获取高资源进程
high_resource_procs = get_high_resource_processes()
# 启动监控
for proc in high_resource_procs:
try:
process = psutil.Process(proc['pid'])
# 设置日志文件
log_file = setup_process_logger(proc['pid'], 'auto')
# 记录进程数据
data = get_process_data(process)
log_process_metrics(proc['pid'], data, log_file)
except psutil.NoSuchProcess:
continue
except Exception as e:
logger.error(f"监控进程 {proc['pid']} 时出错: {str(e)}")
return JsonResponse({
'status': 'success',
'message': '已开始自动检测高资源进程',
'processes': high_resource_procs
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"自动检测失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'自动检测失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
def setup_logger(pid):
"""为每个进程设置独立的日志记录器"""
log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'process_monitor', f'process_{pid}_{datetime.now().strftime("%Y%m%d")}.log')
process_logger = logging.getLogger(f'monitor.process.process_{pid}')
process_logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
process_logger.addHandler(handler)
return process_logger, log_file
def continuous_monitor():
"""持续监控高资源进程的线程函数"""
global is_monitoring
logger = logging.getLogger('monitor')
interval = 60 # 设置为60秒间隔
while is_monitoring:
try:
next_update = timezone.now() + timezone.timedelta(seconds=interval)
# 获取所有活跃的监控记录
monitors = {
'cpu': HighCPUProcess.objects.filter(is_active=True),
'memory': HighMemoryProcess.objects.filter(is_active=True),
'gpu': HighGPUProcess.objects.filter(is_active=True)
}
for monitor_type, processes in monitors.items():
for proc in processes:
try:
process = psutil.Process(proc.pid)
data = get_process_data(process)
log_file = setup_process_logger(proc.pid, monitor_type)
log_process_metrics(proc.pid, data, log_file)
except psutil.NoSuchProcess:
proc.is_active = False
proc.save()
except Exception as e:
logger.error(f"监控进程 {proc.pid} 时出错: {str(e)}")
# 计算需要等待的时间
now = timezone.now()
if next_update > now:
time.sleep((next_update - now).total_seconds())
except Exception as e:
logger.error(f"持续监控出错: {str(e)}")
time.sleep(5)
def get_process_data(process):
"""获取进程的详细数据"""
with process.oneshot():
cpu_times = process.cpu_times()
cpu_freq = psutil.cpu_freq() or psutil._common.scpufreq(current=0, min=0, max=0)
cpu_ctx = process.num_ctx_switches()
mem = process.memory_info()
try:
mem_maps = len(process.memory_maps())
except (psutil.AccessDenied, psutil.TimeoutExpired):
mem_maps = 0
sys_mem = psutil.virtual_memory()
swap = psutil.swap_memory()
try:
io = process.io_counters()
except (psutil.AccessDenied, psutil.TimeoutExpired):
io = psutil._common.pio(read_count=0, write_count=0, read_bytes=0, write_bytes=0,
read_chars=0, write_chars=0, other_count=0, other_bytes=0)
gpu_usage, gpu_memory = get_process_gpu_usage(process.pid)
return {
'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
'cpu': {
'usage': f"{process.cpu_percent()}%",
'user_time': f"{cpu_times.user:.1f}s",
'system_time': f"{cpu_times.system:.1f}s",
'cores': str(psutil.cpu_count()),
'frequency': f"{getattr(cpu_freq, 'current', 0):.1f}MHz",
'context_switches': f"{cpu_ctx.voluntary}/{cpu_ctx.involuntary}"
},
'memory': {
'physical': f"{mem.rss/1024/1024:.1f}MB ({process.memory_percent():.1f}%)",
'virtual': f"{mem.vms/1024/1024:.1f}MB",
'mappings': f"{mem_maps}",
'system_usage': f"{sys_mem.percent:.1f}%",
'swap_usage': f"{swap.percent:.1f}%"
},
'gpu': {
'usage': f"{gpu_usage:.1f}%",
'memory': f"{gpu_memory:.1f}MB"
},
'io': {
'read': f"{getattr(io, 'read_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'read_count', 0)}次)",
'write': f"{getattr(io, 'write_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'write_count', 0)}次)",
'other': f"{getattr(io, 'other_count', 0)}"
},
'status': process.status()
}
@csrf_exempt
@require_http_methods(["GET", "POST"])
def start_monitor(request):
"""开始监控进程"""
global monitor_thread, is_monitoring
try:
pid = request.GET.get('pid')
monitor_type = request.GET.get('type')
if not pid:
return JsonResponse({
'status': 'error',
'message': '请输入进程ID'
}, json_dumps_params={'ensure_ascii': False})
pid = int(pid)
try:
process = psutil.Process(pid)
process_name = process.name()
# 启动监控线程
if not is_monitoring:
is_monitoring = True
monitor_thread = threading.Thread(target=continuous_monitor)
monitor_thread.daemon = True
monitor_thread.start()
return JsonResponse({
'status': 'success',
'message': f'开始监控进程 {pid} ({process_name})'
}, json_dumps_params={'ensure_ascii': False})
except psutil.NoSuchProcess:
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 不存在'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"启动监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'启动监控失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET", "POST"])
def stop_monitor(request):
"""停止监控进程"""
global is_monitoring
try:
pid = request.GET.get('pid')
if not pid:
return JsonResponse({
'status': 'error',
'message': '请输入进程ID'
}, json_dumps_params={'ensure_ascii': False})
pid = int(pid)
# 停止监控线程
is_monitoring = False
return JsonResponse({
'status': 'success',
'message': f'停止监控进程 {pid}'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"停止监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'停止监控失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@require_http_methods(["GET"])
def get_process_status(request, pid):
"""获取进程监控状态的API"""
try:
monitor_type = request.GET.get('type')
logger.info(f"获取进程状态: pid={pid}, type={monitor_type}")
# 先设置日志文件路径
log_file = setup_process_logger(pid, monitor_type)
if not log_file:
raise ValueError(f"无法创建日志文件,监控类型: {monitor_type}")
process = psutil.Process(pid)
data = get_process_data(process)
# 同步数据到数据库
try:
# 更新 all 资源记录
AllResourceProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': float(data['cpu']['usage'].replace('%', '')),
'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')),
'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')),
'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()),
'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()),
'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()),
'gpu_usage': float(data['gpu']['usage'].replace('%', '')),
'gpu_memory': float(data['gpu']['memory'].replace('MB', '')),
'net_io_sent': float(data['io']['write'].split('MB')[0].strip()),
'net_io_recv': float(data['io']['read'].split('MB')[0].strip()),
'is_active': True,
'status': 1,
'log_file': log_file
}
)
# 根据监控类型选择对应的模型
if monitor_type == 'cpu':
# 从字符串中提取CPU使用率数值
cpu_usage = float(data['cpu']['usage'].replace('%', ''))
logger.info(f"CPU使用率: {cpu_usage}%") # 调试日志
process_obj, created = HighCPUProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': cpu_usage,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
logger.info(f"{'创建' if created else '更新'}CPU进程记录: {process_obj}")
elif monitor_type == 'memory':
# 从字符串中提取内存使用量和百分比
memory_info = data['memory']['physical']
memory_usage = float(memory_info.split('MB')[0].strip())
memory_percent = float(memory_info.split('(')[1].split('%')[0].strip())
HighMemoryProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'memory_usage': memory_usage,
'memory_percent': memory_percent,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
elif monitor_type == 'gpu':
# 从字符串中提取GPU使用率和显存
gpu_usage = float(data['gpu']['usage'].replace('%', ''))
gpu_memory = float(data['gpu']['memory'].replace('MB', ''))
HighGPUProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'gpu_usage': gpu_usage,
'gpu_memory': gpu_memory,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
return JsonResponse({
'status': 'success',
'data': data
})
except psutil.NoSuchProcess:
# 进程已终止
monitor.is_active = False
monitor.status = 0
monitor.save()
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 已终止'
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
})
def update_process_status(pid, monitor_type, is_active=False, status=0):
"""更新进程状态"""
try:
if monitor_type == 'cpu':
HighCPUProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
elif monitor_type == 'memory':
HighMemoryProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
elif monitor_type == 'gpu':
HighGPUProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
except Exception as e:
logger.error(f"更新进程状态失败: {str(e)}")
def index(request):
"""渲染主页"""
return render(request, 'index.html')
@csrf_exempt
@require_http_methods(["POST"])
def stop_auto_detect(request):
"""停止自动检测的API端点"""
try:
return JsonResponse({
'status': 'success',
'message': '已停止自动检测'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"停止自动检测失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'停止自动检测失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_latest_log(request, pid):
"""获取最新的监控日志"""
try:
monitor_type = request.GET.get('type') # 获取监控类型cpu/gpu/memory
if not monitor_type:
return JsonResponse({
'status': 'error',
'message': '请指定监控类型'
})
# 根据类型获取对应的监控记录
monitor = None
if monitor_type == 'cpu':
monitor = HighCPUProcess.objects.filter(pid=pid, is_active=True).first()
elif monitor_type == 'gpu':
monitor = HighGPUProcess.objects.filter(pid=pid, is_active=True).first()
elif monitor_type == 'memory':
monitor = HighMemoryProcess.objects.filter(pid=pid, is_active=True).first()
if not monitor:
return JsonResponse({
'status': 'error',
'message': f'未找到进程 {pid}{monitor_type}监控记录'
})
# 获取最新数据
try:
process = psutil.Process(pid)
with process.oneshot():
data = {
'pid': pid,
'name': process.name(),
'status': 1, # 1表示进程运行中
}
# 根据监控类型添加相应的数据
if monitor_type == 'cpu':
data['cpu_percent'] = process.cpu_percent()
elif monitor_type == 'memory':
memory_info = process.memory_info()
data['memory_gb'] = memory_info.rss / (1024 * 1024 * 1024)
data['memory_percent'] = process.memory_percent()
elif monitor_type == 'gpu':
gpu_usage, gpu_memory = get_process_gpu_usage(pid)
data['gpu_usage'] = gpu_usage
data['gpu_memory'] = gpu_memory
return JsonResponse({
'status': 'success',
'data': data
})
except psutil.NoSuchProcess:
# 进程已终止
monitor.is_active = False
monitor.status = 0
monitor.save()
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 已终止'
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
})
@csrf_exempt
@require_http_methods(["GET"])
def get_process_list(request):
"""获取当前监控的进程列表"""
try:
processes = []
# 获取所有活跃的监控进程
cpu_processes = HighCPUProcess.objects.filter(is_active=True)
gpu_processes = HighGPUProcess.objects.filter(is_active=True)
memory_processes = HighMemoryProcess.objects.filter(is_active=True)
# 创建进程信息字典
process_dict = {}
# 处理 CPU 进程
for proc in cpu_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': proc.cpu_usage,
'memory_gb': 0,
'memory_percent': 0,
'gpu_usage': 0,
'gpu_memory': 0,
'resource_types': ['cpu']
}
else:
process_dict[proc.pid]['cpu_percent'] = proc.cpu_usage
process_dict[proc.pid]['resource_types'].append('cpu')
# 处理 GPU 进程
for proc in gpu_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': 0,
'memory_gb': 0,
'memory_percent': 0,
'gpu_usage': proc.gpu_usage,
'gpu_memory': proc.gpu_memory,
'resource_types': ['gpu']
}
else:
process_dict[proc.pid]['gpu_usage'] = proc.gpu_usage
process_dict[proc.pid]['gpu_memory'] = proc.gpu_memory
process_dict[proc.pid]['resource_types'].append('gpu')
# 处理内存进程
for proc in memory_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': 0,
'memory_gb': proc.memory_usage,
'memory_percent': proc.memory_percent,
'gpu_usage': 0,
'gpu_memory': 0,
'resource_types': ['memory']
}
else:
process_dict[proc.pid]['memory_gb'] = proc.memory_usage
process_dict[proc.pid]['memory_percent'] = proc.memory_percent
process_dict[proc.pid]['resource_types'].append('memory')
# 转换字典为列表
processes = list(process_dict.values())
# 按总资源占用率排序
processes.sort(key=lambda x: (
x['cpu_percent'] / 100 +
x['memory_percent'] / 100 +
x['gpu_usage'] / 100
), reverse=True)
return JsonResponse({
'status': 'success',
'processes': processes
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
}, json_dumps_params={'ensure_ascii': False})
def high_resource_list(request):
"""高资源进程列表视图"""
context = {
'cpu_processes': HighCPUProcess.objects.all().order_by('-updated_at'),
'memory_processes': HighMemoryProcess.objects.all().order_by('-updated_at'),
'gpu_processes': HighGPUProcess.objects.all().order_by('-updated_at'),
}
return render(request, 'high_resource_list.html', context)
def find_python_processes_in_dir(directory):
"""查找指定目录下运行的 Python 进程"""
python_processes = []
directory = str(Path(directory).resolve()) # 转换为绝对路径,并处理跨平台差异
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cwd']):
try:
# 检查是否是 Python 进程
if proc.info['name'].lower().startswith('python'):
# 检查进程的工作目录
proc_cwd = str(Path(proc.info['cwd']).resolve())
if proc_cwd.startswith(directory):
# 检查命令行参数
cmdline = proc.info['cmdline']
if cmdline:
python_processes.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cmdline': ' '.join(cmdline),
'cwd': proc_cwd
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return python_processes
def get_gpu_info(pid):
"""获取进程的GPU使用信息"""
try:
import pynvml
pynvml.nvmlInit()
gpu_info = {
'usage': 0,
'memory': 0,
'device_count': pynvml.nvmlDeviceGetCount()
}
for i in range(gpu_info['device_count']):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
procs = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)
for proc in procs:
if proc.pid == pid:
gpu_info['memory'] = proc.usedGpuMemory / 1024 / 1024 # 转换为MB
gpu_info['usage'] = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu
break
return gpu_info
except Exception as e:
logger.error(f"获取GPU信息失败: {str(e)}")
return {'usage': 0, 'memory': 0, 'device_count': 0}
def get_io_counters(proc):
"""获取进程的IO计数器信息"""
try:
io = proc.io_counters()
return {
'read_bytes': io.read_bytes / 1024 / 1024, # 转换为MB
'write_bytes': io.write_bytes / 1024 / 1024,
'read_count': io.read_count,
'write_count': io.write_count
}
except:
return {'read_bytes': 0, 'write_bytes': 0, 'read_count': 0, 'write_count': 0}
def get_network_connections(proc):
"""获取进程的网络连接信息"""
try:
connections = []
for conn in proc.connections():
connections.append({
'local_address': f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "",
'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else "",
'status': conn.status
})
return connections
except:
return []
def get_open_files(proc):
"""获取进程打开的文件列表"""
try:
return [f.path for f in proc.open_files()]
except:
return []
@csrf_exempt
@require_http_methods(["POST"])
def fetch_tiktok_videos(request):
"""获取TikTok用户视频并下载播放量前十的视频"""
try:
data = json.loads(request.body)
unique_id = data.get('unique_id')
if not unique_id:
return JsonResponse({
'status': 'error',
'message': '请提供TikTok用户ID(unique_id)'
}, json_dumps_params={'ensure_ascii': False})
# 调用API获取用户资料和secUid
logger.info(f"正在获取用户 {unique_id} 的资料...")
user_profile = fetch_user_profile(unique_id)
if not user_profile or 'data' not in user_profile:
return JsonResponse({
'status': 'error',
'message': f'无法获取用户 {unique_id} 的资料'
}, json_dumps_params={'ensure_ascii': False})
# 从API响应中提取secUid和其他用户信息
try:
user_info = user_profile['data']['userInfo']['user']
sec_uid = user_info['secUid']
# 提取其他用户信息
nickname = user_info.get('nickname', f'用户_{unique_id}')
signature = user_info.get('signature', '')
avatar_url = user_info.get('avatarLarger', '')
user_stats = user_profile['data']['userInfo'].get('stats', {})
follower_count = user_stats.get('followerCount', 0)
heart_count = user_stats.get('heartCount', 0)
logger.info(f"成功获取用户secUid: {sec_uid}")
except (KeyError, TypeError) as e:
logger.error(f"解析用户资料出错: {e}")
return JsonResponse({
'status': 'error',
'message': f'解析用户资料出错: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
# 确保用户目录存在
user_dir = os.path.join(TIKTOK_VIDEOS_PATH, unique_id)
os.makedirs(user_dir, exist_ok=True)
# 获取用户视频
videos_data = fetch_user_videos(sec_uid)
# 提取视频信息并按播放量排序
all_videos = []
if videos_data and isinstance(videos_data, dict) and 'data' in videos_data and 'itemList' in videos_data['data']:
for video in videos_data['data']['itemList']:
try:
# 确保提取正确的视频ID
video_id = video.get('id', '')
if not video_id or not str(video_id).isdigit():
logger.warning(f"跳过无效的视频ID: {video_id}")
continue
# 安全地获取统计数据
stats = video.get('stats', {})
if not isinstance(stats, dict):
stats = {}
play_count = int(stats.get('playCount', 0))
# 收集视频信息
all_videos.append({
'id': video_id,
'desc': video.get('desc', ''),
'create_time': video.get('createTime', 0),
'cover': video.get('cover', ''),
'play_count': play_count,
'comment_count': int(stats.get('commentCount', 0)),
'digg_count': int(stats.get('diggCount', 0)),
'share_count': int(stats.get('shareCount', 0))
})
except Exception as e:
logger.error(f"处理视频数据出错: {str(e)}")
continue
# 按播放量排序并获取前10个
all_videos.sort(key=lambda x: x['play_count'], reverse=True)
top_videos = all_videos[:10]
# 下载视频
downloaded_videos = []
for i, video in enumerate(top_videos):
# 获取视频ID
video_id = video['id'] # 这是数字ID
# 构建保存路径
save_path = os.path.join(user_dir, f"{video_id}.mp4")
logger.info(f"开始下载第 {i+1} 个热门视频(ID: {video_id}): {video['desc'][:20]}...")
# 使用正确的ID调用下载函数
if download_video(video_id, unique_id, save_path):
video['download_path'] = save_path
downloaded_videos.append(video)
logger.info(f"下载成功: {save_path}")
else:
logger.error(f"下载失败: {video_id}")
# 避免频繁请求被限制
time.sleep(1)
# 保存用户信息和视频信息到数据库
video_info_json = json.dumps([{
'id': v['id'],
'desc': v['desc'],
'play_count': v['play_count']
} for v in downloaded_videos], ensure_ascii=False)
user_record = TiktokUserVideos.objects.update_or_create(
sec_user_id=sec_uid,
defaults={
'nickname': nickname,
'signature': signature,
'follower_count': follower_count,
'total_favorited': heart_count,
'avatar_url': avatar_url,
'videos_folder': user_dir,
'video_paths': video_info_json
}
)
return JsonResponse({
'status': 'success',
'message': '处理完成',
'user_info': {
'nickname': nickname,
'unique_id': unique_id,
'sec_uid': sec_uid,
'avatar': avatar_url,
'follower_count': follower_count,
'total_favorited': heart_count,
'signature': signature
},
'videos_count': len(videos_data['data']['itemList']) if videos_data and 'data' in videos_data and 'itemList' in videos_data['data'] else 0,
'downloaded_videos': len(downloaded_videos),
'videos': [{'id': v['id'], 'desc': v['desc'][:50], 'play_count': v['play_count']} for v in downloaded_videos],
'video_directory': user_dir # 添加视频目录路径方便查找
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"处理TikTok视频失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'status': 'error',
'message': f'处理TikTok视频失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@require_http_methods(["GET"])
def get_tiktok_user_videos(request):
"""获取已下载的TikTok用户视频列表"""
try:
sec_user_id = request.GET.get('sec_user_id')
if not sec_user_id:
# 如果没有指定用户ID返回所有用户列表
users = TiktokUserVideos.objects.all().values('sec_user_id', 'nickname', 'follower_count', 'videos_folder', 'create_time')
return JsonResponse({
'status': 'success',
'users': list(users)
}, json_dumps_params={'ensure_ascii': False})
# 查询指定用户信息
try:
user = TiktokUserVideos.objects.get(sec_user_id=sec_user_id)
# 解析视频信息JSON
video_info = json.loads(user.video_paths) if user.video_paths else []
# 获取文件夹中的文件列表
videos_folder = user.videos_folder
video_files = []
if os.path.exists(videos_folder):
video_files = [f for f in os.listdir(videos_folder) if os.path.isfile(os.path.join(videos_folder, f))]
except TiktokUserVideos.DoesNotExist:
return JsonResponse({
'status': 'error',
'message': f'用户 {sec_user_id} 不存在'
}, json_dumps_params={'ensure_ascii': False})
return JsonResponse({
'status': 'success',
'user_info': {
'sec_user_id': user.sec_user_id,
'nickname': user.nickname,
'signature': user.signature,
'follower_count': user.follower_count,
'total_favorited': user.total_favorited,
'avatar_url': user.avatar_url,
'create_time': user.create_time.strftime('%Y-%m-%d %H:%M:%S'),
'update_time': user.update_time.strftime('%Y-%m-%d %H:%M:%S')
},
'videos_folder': videos_folder,
'video_files': video_files,
'video_info': video_info
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"获取TikTok视频列表失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'获取TikTok视频列表失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
# 辅助函数
def fetch_user_videos(sec_uid, max_cursor=0, count=20):
"""获取用户视频列表"""
url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_post?secUid={sec_uid}"
try:
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
logger.error(f"获取视频列表失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取视频列表异常: {e}")
return None
def fetch_user_profile(unique_id):
"""获取用户基本信息"""
# 添加User-Agent头模拟浏览器访问
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://www.tiktok.com/'
}
url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_profile?uniqueId={unique_id}"
try:
logger.info(f"正在请求用户资料: {url}")
# 添加重试机制
for attempt in range(3):
try:
response = requests.get(url, headers=headers, timeout=30)
break
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
logger.warning(f"请求超时或连接错误,尝试第{attempt+1}次重试: {e}")
if attempt == 2: # 最后一次重试失败
raise
time.sleep(2) # 等待2秒后重试
if response.status_code == 200:
try:
data = response.json()
logger.info(f"获取用户资料成功: {unique_id}")
# 打印完整响应以便调试
logger.info(f"API原始响应: {data}")
# 验证数据完整性
if 'data' not in data or not data['data']:
logger.error(f"API响应缺少data字段: {data}")
return None
if 'userInfo' not in data['data'] or not data['data']['userInfo']:
logger.error(f"API响应缺少userInfo字段: {data['data']}")
return None
if 'user' not in data['data']['userInfo'] or not data['data']['userInfo']['user']:
logger.error(f"API响应缺少user字段: {data['data']['userInfo']}")
return None
# 打印用户信息
logger.info(f"用户信息: {data['data']['userInfo']['user']}")
return data
except json.JSONDecodeError:
logger.error(f"API响应不是有效的JSON: {response.text[:500]}")
return None
else:
logger.error(f"获取用户信息失败: HTTP {response.status_code}, 响应: {response.text[:500]}")
return None
except Exception as e:
logger.error(f"获取用户信息异常: {e}")
return None
def download_video(video_id, unique_id, save_path):
"""使用API的直接下载接口下载TikTok视频"""
# 确保视频ID是纯数字
if not str(video_id).isdigit():
logger.error(f"无效的视频ID: {video_id},必须是纯数字")
return False
# 构建标准TikTok视频URL
tiktok_url = f"https://www.tiktok.com/@{unique_id}/video/{video_id}"
logger.info(f"构建的TikTok URL: {tiktok_url}")
# 构建完整的API请求URL
api_url = f"http://81.69.223.133:45268/api/download"
full_url = f"{api_url}?url={tiktok_url}&prefix=true&with_watermark=false"
logger.info(f"完整的API请求URL: {full_url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
try:
# 直接使用完整URL发送请求
response = requests.get(full_url, headers=headers, stream=True, timeout=60)
# 检查响应状态
if response.status_code != 200:
logger.error(f"下载视频失败: {response.status_code} - {response.text[:200] if response.text else '无响应内容'}")
return False
# 获取内容类型
content_type = response.headers.get('Content-Type', '')
logger.info(f"响应内容类型: {content_type}")
# 保存文件
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
file_size = os.path.getsize(save_path)
logger.info(f"视频已下载到: {save_path},文件大小: {file_size}字节")
return True
except Exception as e:
logger.error(f"下载视频异常: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return False
def fetch_user_followings(sec_uid):
"""获取用户关注列表"""
url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_follow?secUid={sec_uid}"
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
data = response.json()
logger.info(f"成功获取用户关注列表,共 {len(data['data'].get('userList', []))} 个关注")
return data
else:
logger.error(f"获取用户关注列表失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取用户关注列表异常: {e}")
return None
def filter_users_by_followers(user_list, min_followers=5000, max_followers=50000):
"""筛选粉丝数在指定范围内的用户"""
filtered_users = []
for user_data in user_list:
try:
follower_count = user_data.get('stats', {}).get('followerCount', 0)
if min_followers <= follower_count <= max_followers:
filtered_users.append(user_data)
except Exception as e:
logger.error(f"筛选用户时出错: {e}")
return filtered_users
@csrf_exempt
@require_http_methods(["POST"])
def recursive_fetch_videos(request):
"""递归获取关注列表中的用户视频"""
try:
data = json.loads(request.body)
start_unique_id = data.get('unique_id')
max_depth = int(data.get('max_depth', 3)) # 默认递归深度为3
if not start_unique_id:
return JsonResponse({
'status': 'error',
'message': '请提供起始TikTok用户ID(unique_id)'
}, json_dumps_params={'ensure_ascii': False})
# 获取起始用户资料和secUid
user_profile = fetch_user_profile(start_unique_id)
if not user_profile or 'data' not in user_profile:
return JsonResponse({
'status': 'error',
'message': f'无法获取用户 {start_unique_id} 的资料'
}, json_dumps_params={'ensure_ascii': False})
# 提取secUid和其他用户信息
try:
user_info = user_profile['data']['userInfo']['user']
start_sec_uid = user_info['secUid']
# 提取起始用户的详细信息
nickname = user_info.get('nickname', '')
signature = user_info.get('signature', '')
avatar_url = user_info.get('avatarLarger', '')
# 提取统计信息
stats = user_profile['data']['userInfo'].get('stats', {})
follower_count = stats.get('followerCount', 0)
following_count = stats.get('followingCount', 0)
heart_count = stats.get('heartCount', 0) or stats.get('diggCount', 0)
video_count = stats.get('videoCount', 0)
# 为起始用户创建目录
start_user_dir = os.path.join(TIKTOK_VIDEOS_PATH, start_unique_id)
os.makedirs(start_user_dir, exist_ok=True)
# 保存起始用户信息到数据库
TiktokUserVideos.objects.update_or_create(
sec_user_id=start_sec_uid,
defaults={
'nickname': nickname,
'signature': signature,
'follower_count': follower_count,
'following_count': following_count,
'total_favorited': heart_count,
'video_count': video_count,
'avatar_url': avatar_url,
'videos_folder': start_user_dir
}
)
logger.info(f"成功获取并保存起始用户信息: {start_unique_id}, secUid: {start_sec_uid}")
except (KeyError, TypeError) as e:
logger.error(f"解析用户资料出错: {e}")
return JsonResponse({
'status': 'error',
'message': f'解析用户资料出错: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
# 开始递归获取视频
all_downloaded_videos = []
processed_users = set() # 已处理的用户集合,避免重复处理
def process_user(sec_uid, unique_id, depth=0):
"""递归处理用户,获取视频和关注用户"""
if depth >= max_depth or sec_uid in processed_users:
return
processed_users.add(sec_uid)
logger.info(f"处理用户 {unique_id},递归深度: {depth}")
# 确保用户目录存在
user_dir = os.path.join(TIKTOK_VIDEOS_PATH, unique_id)
os.makedirs(user_dir, exist_ok=True)
# 下载该用户的热门视频
videos_data = fetch_user_videos(sec_uid)
all_videos = []
if videos_data and isinstance(videos_data, dict) and 'data' in videos_data and 'itemList' in videos_data['data']:
for video in videos_data['data']['itemList']:
try:
video_id = video.get('id', '')
if not video_id or not str(video_id).isdigit():
continue
stats = video.get('stats', {})
if not isinstance(stats, dict):
stats = {}
play_count = int(stats.get('playCount', 0))
all_videos.append({
'id': video_id,
'desc': video.get('desc', ''),
'play_count': play_count
})
except Exception as e:
logger.error(f"处理视频数据出错: {str(e)}")
continue
# 按播放量排序并获取前10个
all_videos.sort(key=lambda x: x['play_count'], reverse=True)
top_videos = all_videos[:10]
# 下载视频
downloaded_videos = []
for i, video in enumerate(top_videos):
video_id = video['id']
save_path = os.path.join(user_dir, f"{video_id}.mp4")
logger.info(f"下载用户 {unique_id} 的第 {i+1} 个热门视频: {video_id}")
if download_video(video_id, unique_id, save_path):
video['download_path'] = save_path
video['user_unique_id'] = unique_id
downloaded_videos.append(video)
all_downloaded_videos.append(video)
time.sleep(1) # 避免频繁请求
# 保存用户信息到数据库
video_info_json = json.dumps([{
'id': v['id'],
'desc': v['desc'],
'play_count': v['play_count']
} for v in downloaded_videos], ensure_ascii=False)
TiktokUserVideos.objects.update_or_create(
sec_user_id=sec_uid,
defaults={
'nickname': unique_id,
'videos_folder': user_dir,
'video_paths': video_info_json
}
)
# 获取关注列表
followings_data = fetch_user_followings(sec_uid)
if followings_data and 'data' in followings_data and 'userList' in followings_data['data']:
user_list = followings_data['data']['userList']
# 筛选粉丝数在5000-50000之间的用户
filtered_users = filter_users_by_followers(user_list, 5000, 50000)
logger.info(f"用户 {unique_id} 的关注列表中有 {len(filtered_users)} 个粉丝数在5000-50000之间")
# 取前5个用户
for user_data in filtered_users[:5]:
try:
# 直接从关注列表中提取用户信息
user_obj = user_data['user']
following_sec_uid = user_obj['secUid']
following_unique_id = user_obj['uniqueId']
# 获取用户详细信息
nickname = user_obj.get('nickname', '')
signature = user_obj.get('signature', '')
avatar_url = user_obj.get('avatarLarger', '')
# 获取统计信息
stats = user_data.get('stats', {})
follower_count = stats.get('followerCount', 0)
following_count = stats.get('followingCount', 0)
heart_count = stats.get('heartCount', 0)
video_count = stats.get('videoCount', 0)
# 保存用户信息到数据库(即使尚未下载视频)
follow_user_dir = os.path.join(TIKTOK_VIDEOS_PATH, following_unique_id)
TiktokUserVideos.objects.update_or_create(
sec_user_id=following_sec_uid,
defaults={
'nickname': nickname,
'signature': signature,
'follower_count': follower_count,
'following_count': following_count,
'total_favorited': heart_count,
'video_count': video_count,
'avatar_url': avatar_url,
'videos_folder': follow_user_dir
}
)
# 递归处理关注的用户
process_user(following_sec_uid, following_unique_id, depth + 1)
except Exception as e:
logger.error(f"处理关注用户时出错: {e}")
continue
# 开始递归处理
process_user(start_sec_uid, start_unique_id)
return JsonResponse({
'status': 'success',
'message': '递归获取视频完成',
'processed_users_count': len(processed_users),
'downloaded_videos_count': len(all_downloaded_videos),
'downloaded_videos': [{'id': v['id'], 'desc': v['desc'][:50], 'play_count': v['play_count'], 'user': v['user_unique_id']} for v in all_downloaded_videos[:100]] # 只返回前100个视频信息避免响应过大
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"递归获取TikTok视频失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
return JsonResponse({
'status': 'error',
'message': f'递归获取TikTok视频失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})