from django.http import JsonResponse from .tasks import monitor_process, get_process_gpu_usage import threading import psutil from .models import HighCPUProcess, HighGPUProcess, HighMemoryProcess, AllResourceProcess, TiktokUserVideos import logging import os from datetime import datetime import time import nvidia_smi from django.utils import timezone from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import csrf_exempt from django.shortcuts import render import pytz from pathlib import Path # 使用 pathlib 处理跨平台路径 import json from django.conf import settings import requests import threading directory_monitoring = {} # 全局变量来控制检测线程 monitor_thread = None is_monitoring = False # 在文件开头定义日志目录 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) LOG_DIR = os.path.join(BASE_DIR, 'logs', 'process_monitor') # 创建保存视频的基本路径 TIKTOK_VIDEOS_PATH = os.path.join(BASE_DIR, 'media', 'tiktok_videos') # 确保基本目录存在 os.makedirs(TIKTOK_VIDEOS_PATH, exist_ok=True) # 确保基础目录结构存在,添加 'all' 目录 for resource_type in ['cpu', 'memory', 'gpu', 'all']: os.makedirs(os.path.join(LOG_DIR, resource_type), exist_ok=True) # 获取应用专属的logger logger = logging.getLogger('monitor') # 全局变量来跟踪监控的目录 monitored_directories = set() # 在文件顶部添加 API 基础 URL API_BASE_URL = "http://81.69.223.133:45268" # 添加新的监控线程函数 def monitor_directory(directory): """持续监控目录的后台任务""" monitor_interval = settings.MONITOR_INTERVAL # 监控间隔(5秒) log_interval = settings.LOG_INTERVAL # 日志写入间隔(60秒) processed_pids = set() # 用于跟踪已处理的PID last_log_time = {} # 每个进程的最后日志时间 while directory_monitoring.get(directory, False): try: next_update = timezone.now() + timezone.timedelta(seconds=monitor_interval) processes = find_python_processes_in_dir(directory) for proc in processes: pid = proc['pid'] try: # 设置日志文件 log_file = setup_process_logger(pid, 'all') if log_file: process = psutil.Process(pid) data = get_process_data(process) # 更新数据库 AllResourceProcess.objects.update_or_create( pid=pid, defaults={ 'process_name': process.name(), 'cpu_usage': float(data['cpu']['usage'].replace('%', '')), 'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')), 'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')), 'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()), 'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()), 'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()), 'gpu_usage': float(data['gpu']['usage'].replace('%', '')), 'gpu_memory': float(data['gpu']['memory'].replace('MB', '')), 'net_io_sent': float(data['io']['write'].split('MB')[0].strip()), 'net_io_recv': float(data['io']['read'].split('MB')[0].strip()), 'is_active': True, 'status': 1, 'log_file': log_file } ) now = timezone.now() # 如果是新检测到的进程,立即写入日志 if pid not in processed_pids: print(f"首次检测到进程,立即写入日志: PID={pid}") # 调试日志 log_process_metrics(pid, data, log_file) processed_pids.add(pid) last_log_time[pid] = now # 对于已知进程,每隔一分钟写入一次 elif (now - last_log_time.get(pid, now)).total_seconds() >= log_interval: print(f"定期写入日志: PID={pid}") # 调试日志 log_process_metrics(pid, data, log_file) last_log_time[pid] = now except Exception as e: logger.error(f"监控进程 {pid} 失败: {str(e)}") # 计算需要等待的时间 now = timezone.now() if next_update > now: time.sleep((next_update - now).total_seconds()) except Exception as e: logger.error(f"目录监控出错: {str(e)}") time.sleep(5) def get_python_processes(directory): """获取指定目录下的Python进程""" directory = str(Path(directory).resolve()).lower() processes = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: if 'python' in proc.info['name'].lower(): process = psutil.Process(proc.info['pid']) try: proc_cwd = str(Path(process.cwd()).resolve()).lower() if directory in proc_cwd or proc_cwd.startswith(directory): processes.append(process) except (psutil.NoSuchProcess, psutil.AccessDenied): continue except (psutil.NoSuchProcess, psutil.AccessDenied): continue return processes def get_process_info(process): """获取进程详细信息""" try: with process.oneshot(): info = { 'pid': process.pid, 'name': process.name(), 'cpu_usage': process.cpu_percent(), 'memory_usage': process.memory_info().rss / (1024 * 1024), # MB 'command_line': ' '.join(process.cmdline()), 'create_time': process.create_time(), 'status': process.status(), 'threads': process.num_threads(), 'working_directory': process.cwd(), 'gpu_info': { 'usage': 0, 'memory': 0 } } # 获取IO信息 try: io = process.io_counters() info['io'] = { 'read_bytes': io.read_bytes / (1024 * 1024), # MB 'write_bytes': io.write_bytes / (1024 * 1024), # MB 'read_count': io.read_count, 'write_count': io.write_count } except (psutil.NoSuchProcess, psutil.AccessDenied): info['io'] = { 'read_bytes': 0, 'write_bytes': 0, 'read_count': 0, 'write_count': 0 } return info except (psutil.NoSuchProcess, psutil.AccessDenied) as e: logger.error(f"获取进程 {process.pid} 信息失败: {str(e)}") return None @csrf_exempt @require_http_methods(["POST"]) def scan_directory(request): """扫描目录下的 Python 进程""" try: data = json.loads(request.body) directory = data.get('directory') if not directory: return JsonResponse({ 'status': 'error', 'message': '请提供目录路径' }) # 添加到监控目录集合 directory = str(Path(directory).resolve()) monitored_directories.add(directory) logger.info(f"开始监控目录: {directory}") return JsonResponse({ 'status': 'success', 'message': f'开始监控目录: {directory}', 'directory': directory }) except Exception as e: logger.error(f"启动目录监控失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': str(e) }) @require_http_methods(["GET"]) def get_directory_status(request): """获取目录下的进程状态""" try: directory = request.GET.get('directory') if not directory: return JsonResponse({ 'status': 'error', 'message': '未提供目录路径' }) # 获取目录下的Python进程 processes = get_python_processes(directory) # 获取每个进程的详细信息 process_info = [] for proc in processes: info = get_process_info(proc) if info: process_info.append(info) return JsonResponse({ 'status': 'success', 'processes': process_info, 'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S') }) except Exception as e: logger.error(f"获取目录状态失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': str(e) }) @require_http_methods(["POST"]) def stop_directory_monitor(request): """停止目录监控""" try: data = json.loads(request.body) directory = data.get('directory') if not directory: return JsonResponse({ 'status': 'error', 'message': '未提供目录路径' }) # 从监控集合中移除 directory = str(Path(directory).resolve()) if directory in monitored_directories: monitored_directories.remove(directory) logger.info(f"停止监控目录: {directory}") return JsonResponse({ 'status': 'success', 'message': '监控已停止' }) except Exception as e: logger.error(f"停止监控失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': str(e) }) def setup_process_logger(pid, monitor_type): """为进程设置独立的日志记录器""" try: # 验证监控类型 if monitor_type not in ['cpu', 'memory', 'gpu', 'all']: print(f"警告:无效的监控类型 {monitor_type}") return None # 获取当前时间(使用本地时区) local_tz = pytz.timezone('Asia/Shanghai') current_time = timezone.localtime(timezone.now(), local_tz) current_date = current_time.strftime("%Y%m%d") # 如果是从目录扫描启动的监控,使用 'all' 类型 if monitor_type == 'scan': monitor_type = 'all' # 构建日志文件路径 log_dir = os.path.join(LOG_DIR, monitor_type) log_file = os.path.join(log_dir, f'{pid}_{current_date}_{monitor_type}.log') # 确保目录存在 os.makedirs(os.path.dirname(log_file), exist_ok=True) print(f"设置日志文件: {log_file}") # 调试信息 return log_file except Exception as e: print(f"设置日志文件失败: {str(e)}") raise def log_process_metrics(pid, data, log_file): """记录进程指标到日志文件""" try: # 确保目录存在 os.makedirs(os.path.dirname(log_file), exist_ok=True) # 获取当前时间(使用本地时区) local_tz = pytz.timezone('Asia/Shanghai') # 使用中国时区 current_time = timezone.localtime(timezone.now(), local_tz) timestamp = current_time.strftime('%Y-%m-%d %H:%M:%S') # 格式化日志内容 log_content = f"=== {timestamp} ===\n" # CPU信息 log_content += "CPU信息:\n" log_content += f"├─ 使用率: {data['cpu']['usage']}\n" log_content += f"├─ 用户态时间: {data['cpu']['user_time']}\n" log_content += f"├─ 内核态时间: {data['cpu']['system_time']}\n" log_content += f"├─ CPU核心数: {data['cpu']['cores']}\n" log_content += f"├─ CPU频率: {data['cpu']['frequency']}\n" log_content += f"└─ 上下文切换: {data['cpu']['context_switches']}\n" # 内存信息 log_content += "内存信息:\n" log_content += f"├─ 物理内存: {data['memory']['physical']}\n" log_content += f"├─ 虚拟内存: {data['memory']['virtual']}\n" log_content += f"├─ 内存映射: {data['memory']['mappings']}\n" log_content += f"├─ 系统内存使用: {data['memory']['system_usage']}\n" log_content += f"└─ 交换空间使用: {data['memory']['swap_usage']}\n" # GPU信息 log_content += "GPU信息:\n" log_content += f"├─ 使用率: {data['gpu']['usage']}\n" log_content += f"└─ 显存使用: {data['gpu']['memory']}\n" # IO信息 log_content += "IO信息:\n" log_content += f"├─ 读取: {data['io']['read']}\n" log_content += f"├─ 写入: {data['io']['write']}\n" log_content += f"└─ 其他IO: {data['io']['other']}\n" # 进程状态 log_content += f"进程状态: {data['status']}\n" log_content += "-" * 50 + "\n" # 写入日志文件 with open(log_file, 'a', encoding='utf-8') as f: f.write(log_content) print(f"已写入日志到: {log_file}") # 调试信息 except Exception as e: print(f"写入日志失败: {str(e)}") raise def get_process_by_name(process_name): """根据进程名称获取进程PID""" pids = [] for proc in psutil.process_iter(['pid', 'name']): try: if process_name.lower() in proc.info['name'].lower(): pids.append(proc.info['pid']) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass return pids def get_process_gpu_usage(pid): """获取进程的GPU使用情况""" try: # 如果没有GPU或无法获取GPU信息,返回默认值 return 0.0, 0.0 except Exception as e: logger.error(f"获取GPU信息失败: {str(e)}") return 0.0, 0.0 def get_high_resource_processes(): """获取高资源占用的进程""" high_resource_pids = [] for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): try: if proc.info['cpu_percent'] > 50 or proc.info['memory_percent'] > 50: high_resource_pids.append({ 'pid': proc.info['pid'], 'name': proc.info['name'], 'cpu_percent': proc.info['cpu_percent'], 'memory_percent': proc.info['memory_percent'] }) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue return high_resource_pids def auto_detect_high_resource_processes(request): """自动检测高资源进程的API端点""" try: # 获取高资源进程 high_resource_procs = get_high_resource_processes() # 启动监控 for proc in high_resource_procs: try: process = psutil.Process(proc['pid']) # 设置日志文件 log_file = setup_process_logger(proc['pid'], 'auto') # 记录进程数据 data = get_process_data(process) log_process_metrics(proc['pid'], data, log_file) except psutil.NoSuchProcess: continue except Exception as e: logger.error(f"监控进程 {proc['pid']} 时出错: {str(e)}") return JsonResponse({ 'status': 'success', 'message': '已开始自动检测高资源进程', 'processes': high_resource_procs }, json_dumps_params={'ensure_ascii': False}) except Exception as e: logger.error(f"自动检测失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': f'自动检测失败: {str(e)}' }, json_dumps_params={'ensure_ascii': False}) def setup_logger(pid): """为每个进程设置独立的日志记录器""" log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'process_monitor', f'process_{pid}_{datetime.now().strftime("%Y%m%d")}.log') process_logger = logging.getLogger(f'monitor.process.process_{pid}') process_logger.setLevel(logging.INFO) handler = logging.FileHandler(log_file) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) process_logger.addHandler(handler) return process_logger, log_file def continuous_monitor(): """持续监控高资源进程的线程函数""" global is_monitoring logger = logging.getLogger('monitor') interval = 60 # 设置为60秒间隔 while is_monitoring: try: next_update = timezone.now() + timezone.timedelta(seconds=interval) # 获取所有活跃的监控记录 monitors = { 'cpu': HighCPUProcess.objects.filter(is_active=True), 'memory': HighMemoryProcess.objects.filter(is_active=True), 'gpu': HighGPUProcess.objects.filter(is_active=True) } for monitor_type, processes in monitors.items(): for proc in processes: try: process = psutil.Process(proc.pid) data = get_process_data(process) log_file = setup_process_logger(proc.pid, monitor_type) log_process_metrics(proc.pid, data, log_file) except psutil.NoSuchProcess: proc.is_active = False proc.save() except Exception as e: logger.error(f"监控进程 {proc.pid} 时出错: {str(e)}") # 计算需要等待的时间 now = timezone.now() if next_update > now: time.sleep((next_update - now).total_seconds()) except Exception as e: logger.error(f"持续监控出错: {str(e)}") time.sleep(5) def get_process_data(process): """获取进程的详细数据""" with process.oneshot(): cpu_times = process.cpu_times() cpu_freq = psutil.cpu_freq() or psutil._common.scpufreq(current=0, min=0, max=0) cpu_ctx = process.num_ctx_switches() mem = process.memory_info() try: mem_maps = len(process.memory_maps()) except (psutil.AccessDenied, psutil.TimeoutExpired): mem_maps = 0 sys_mem = psutil.virtual_memory() swap = psutil.swap_memory() try: io = process.io_counters() except (psutil.AccessDenied, psutil.TimeoutExpired): io = psutil._common.pio(read_count=0, write_count=0, read_bytes=0, write_bytes=0, read_chars=0, write_chars=0, other_count=0, other_bytes=0) gpu_usage, gpu_memory = get_process_gpu_usage(process.pid) return { 'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S'), 'cpu': { 'usage': f"{process.cpu_percent()}%", 'user_time': f"{cpu_times.user:.1f}s", 'system_time': f"{cpu_times.system:.1f}s", 'cores': str(psutil.cpu_count()), 'frequency': f"{getattr(cpu_freq, 'current', 0):.1f}MHz", 'context_switches': f"{cpu_ctx.voluntary}/{cpu_ctx.involuntary}" }, 'memory': { 'physical': f"{mem.rss/1024/1024:.1f}MB ({process.memory_percent():.1f}%)", 'virtual': f"{mem.vms/1024/1024:.1f}MB", 'mappings': f"{mem_maps}个", 'system_usage': f"{sys_mem.percent:.1f}%", 'swap_usage': f"{swap.percent:.1f}%" }, 'gpu': { 'usage': f"{gpu_usage:.1f}%", 'memory': f"{gpu_memory:.1f}MB" }, 'io': { 'read': f"{getattr(io, 'read_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'read_count', 0)}次)", 'write': f"{getattr(io, 'write_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'write_count', 0)}次)", 'other': f"{getattr(io, 'other_count', 0)}次" }, 'status': process.status() } @csrf_exempt @require_http_methods(["GET", "POST"]) def start_monitor(request): """开始监控进程""" global monitor_thread, is_monitoring try: pid = request.GET.get('pid') monitor_type = request.GET.get('type') if not pid: return JsonResponse({ 'status': 'error', 'message': '请输入进程ID' }, json_dumps_params={'ensure_ascii': False}) pid = int(pid) try: process = psutil.Process(pid) process_name = process.name() # 启动监控线程 if not is_monitoring: is_monitoring = True monitor_thread = threading.Thread(target=continuous_monitor) monitor_thread.daemon = True monitor_thread.start() return JsonResponse({ 'status': 'success', 'message': f'开始监控进程 {pid} ({process_name})' }, json_dumps_params={'ensure_ascii': False}) except psutil.NoSuchProcess: return JsonResponse({ 'status': 'error', 'message': f'进程 {pid} 不存在' }, json_dumps_params={'ensure_ascii': False}) except Exception as e: logger.error(f"启动监控失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': f'启动监控失败: {str(e)}' }, json_dumps_params={'ensure_ascii': False}) @csrf_exempt @require_http_methods(["GET", "POST"]) def stop_monitor(request): """停止监控进程""" global is_monitoring try: pid = request.GET.get('pid') if not pid: return JsonResponse({ 'status': 'error', 'message': '请输入进程ID' }, json_dumps_params={'ensure_ascii': False}) pid = int(pid) # 停止监控线程 is_monitoring = False return JsonResponse({ 'status': 'success', 'message': f'停止监控进程 {pid}' }, json_dumps_params={'ensure_ascii': False}) except Exception as e: logger.error(f"停止监控失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': f'停止监控失败: {str(e)}' }, json_dumps_params={'ensure_ascii': False}) @require_http_methods(["GET"]) def get_process_status(request, pid): """获取进程监控状态的API""" try: monitor_type = request.GET.get('type') logger.info(f"获取进程状态: pid={pid}, type={monitor_type}") # 先设置日志文件路径 log_file = setup_process_logger(pid, monitor_type) if not log_file: raise ValueError(f"无法创建日志文件,监控类型: {monitor_type}") process = psutil.Process(pid) data = get_process_data(process) # 同步数据到数据库 try: # 更新 all 资源记录 AllResourceProcess.objects.update_or_create( pid=pid, defaults={ 'process_name': process.name(), 'cpu_usage': float(data['cpu']['usage'].replace('%', '')), 'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')), 'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')), 'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()), 'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()), 'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()), 'gpu_usage': float(data['gpu']['usage'].replace('%', '')), 'gpu_memory': float(data['gpu']['memory'].replace('MB', '')), 'net_io_sent': float(data['io']['write'].split('MB')[0].strip()), 'net_io_recv': float(data['io']['read'].split('MB')[0].strip()), 'is_active': True, 'status': 1, 'log_file': log_file } ) # 根据监控类型选择对应的模型 if monitor_type == 'cpu': # 从字符串中提取CPU使用率数值 cpu_usage = float(data['cpu']['usage'].replace('%', '')) logger.info(f"CPU使用率: {cpu_usage}%") # 调试日志 process_obj, created = HighCPUProcess.objects.update_or_create( pid=pid, defaults={ 'process_name': process.name(), 'cpu_usage': cpu_usage, 'is_active': True, 'status': 1, 'log_file': log_file } ) logger.info(f"{'创建' if created else '更新'}CPU进程记录: {process_obj}") elif monitor_type == 'memory': # 从字符串中提取内存使用量和百分比 memory_info = data['memory']['physical'] memory_usage = float(memory_info.split('MB')[0].strip()) memory_percent = float(memory_info.split('(')[1].split('%')[0].strip()) HighMemoryProcess.objects.update_or_create( pid=pid, defaults={ 'process_name': process.name(), 'memory_usage': memory_usage, 'memory_percent': memory_percent, 'is_active': True, 'status': 1, 'log_file': log_file } ) elif monitor_type == 'gpu': # 从字符串中提取GPU使用率和显存 gpu_usage = float(data['gpu']['usage'].replace('%', '')) gpu_memory = float(data['gpu']['memory'].replace('MB', '')) HighGPUProcess.objects.update_or_create( pid=pid, defaults={ 'process_name': process.name(), 'gpu_usage': gpu_usage, 'gpu_memory': gpu_memory, 'is_active': True, 'status': 1, 'log_file': log_file } ) return JsonResponse({ 'status': 'success', 'data': data }) except psutil.NoSuchProcess: # 进程已终止 monitor.is_active = False monitor.status = 0 monitor.save() return JsonResponse({ 'status': 'error', 'message': f'进程 {pid} 已终止' }) except Exception as e: return JsonResponse({ 'status': 'error', 'message': str(e) }) def update_process_status(pid, monitor_type, is_active=False, status=0): """更新进程状态""" try: if monitor_type == 'cpu': HighCPUProcess.objects.filter(pid=pid).update( is_active=is_active, status=status ) elif monitor_type == 'memory': HighMemoryProcess.objects.filter(pid=pid).update( is_active=is_active, status=status ) elif monitor_type == 'gpu': HighGPUProcess.objects.filter(pid=pid).update( is_active=is_active, status=status ) except Exception as e: logger.error(f"更新进程状态失败: {str(e)}") def index(request): """渲染主页""" return render(request, 'index.html') @csrf_exempt @require_http_methods(["POST"]) def stop_auto_detect(request): """停止自动检测的API端点""" try: return JsonResponse({ 'status': 'success', 'message': '已停止自动检测' }, json_dumps_params={'ensure_ascii': False}) except Exception as e: logger.error(f"停止自动检测失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': f'停止自动检测失败: {str(e)}' }, json_dumps_params={'ensure_ascii': False}) @csrf_exempt @require_http_methods(["GET"]) def get_latest_log(request, pid): """获取最新的监控日志""" try: monitor_type = request.GET.get('type') # 获取监控类型(cpu/gpu/memory) if not monitor_type: return JsonResponse({ 'status': 'error', 'message': '请指定监控类型' }) # 根据类型获取对应的监控记录 monitor = None if monitor_type == 'cpu': monitor = HighCPUProcess.objects.filter(pid=pid, is_active=True).first() elif monitor_type == 'gpu': monitor = HighGPUProcess.objects.filter(pid=pid, is_active=True).first() elif monitor_type == 'memory': monitor = HighMemoryProcess.objects.filter(pid=pid, is_active=True).first() if not monitor: return JsonResponse({ 'status': 'error', 'message': f'未找到进程 {pid} 的{monitor_type}监控记录' }) # 获取最新数据 try: process = psutil.Process(pid) with process.oneshot(): data = { 'pid': pid, 'name': process.name(), 'status': 1, # 1表示进程运行中 } # 根据监控类型添加相应的数据 if monitor_type == 'cpu': data['cpu_percent'] = process.cpu_percent() elif monitor_type == 'memory': memory_info = process.memory_info() data['memory_gb'] = memory_info.rss / (1024 * 1024 * 1024) data['memory_percent'] = process.memory_percent() elif monitor_type == 'gpu': gpu_usage, gpu_memory = get_process_gpu_usage(pid) data['gpu_usage'] = gpu_usage data['gpu_memory'] = gpu_memory return JsonResponse({ 'status': 'success', 'data': data }) except psutil.NoSuchProcess: # 进程已终止 monitor.is_active = False monitor.status = 0 monitor.save() return JsonResponse({ 'status': 'error', 'message': f'进程 {pid} 已终止' }) except Exception as e: return JsonResponse({ 'status': 'error', 'message': str(e) }) @csrf_exempt @require_http_methods(["GET"]) def get_process_list(request): """获取当前监控的进程列表""" try: processes = [] # 获取所有活跃的监控进程 cpu_processes = HighCPUProcess.objects.filter(is_active=True) gpu_processes = HighGPUProcess.objects.filter(is_active=True) memory_processes = HighMemoryProcess.objects.filter(is_active=True) # 创建进程信息字典 process_dict = {} # 处理 CPU 进程 for proc in cpu_processes: if proc.pid not in process_dict: process_dict[proc.pid] = { 'pid': proc.pid, 'name': proc.process_name, 'cpu_percent': proc.cpu_usage, 'memory_gb': 0, 'memory_percent': 0, 'gpu_usage': 0, 'gpu_memory': 0, 'resource_types': ['cpu'] } else: process_dict[proc.pid]['cpu_percent'] = proc.cpu_usage process_dict[proc.pid]['resource_types'].append('cpu') # 处理 GPU 进程 for proc in gpu_processes: if proc.pid not in process_dict: process_dict[proc.pid] = { 'pid': proc.pid, 'name': proc.process_name, 'cpu_percent': 0, 'memory_gb': 0, 'memory_percent': 0, 'gpu_usage': proc.gpu_usage, 'gpu_memory': proc.gpu_memory, 'resource_types': ['gpu'] } else: process_dict[proc.pid]['gpu_usage'] = proc.gpu_usage process_dict[proc.pid]['gpu_memory'] = proc.gpu_memory process_dict[proc.pid]['resource_types'].append('gpu') # 处理内存进程 for proc in memory_processes: if proc.pid not in process_dict: process_dict[proc.pid] = { 'pid': proc.pid, 'name': proc.process_name, 'cpu_percent': 0, 'memory_gb': proc.memory_usage, 'memory_percent': proc.memory_percent, 'gpu_usage': 0, 'gpu_memory': 0, 'resource_types': ['memory'] } else: process_dict[proc.pid]['memory_gb'] = proc.memory_usage process_dict[proc.pid]['memory_percent'] = proc.memory_percent process_dict[proc.pid]['resource_types'].append('memory') # 转换字典为列表 processes = list(process_dict.values()) # 按总资源占用率排序 processes.sort(key=lambda x: ( x['cpu_percent'] / 100 + x['memory_percent'] / 100 + x['gpu_usage'] / 100 ), reverse=True) return JsonResponse({ 'status': 'success', 'processes': processes }, json_dumps_params={'ensure_ascii': False}) except Exception as e: return JsonResponse({ 'status': 'error', 'message': str(e) }, json_dumps_params={'ensure_ascii': False}) def high_resource_list(request): """高资源进程列表视图""" context = { 'cpu_processes': HighCPUProcess.objects.all().order_by('-updated_at'), 'memory_processes': HighMemoryProcess.objects.all().order_by('-updated_at'), 'gpu_processes': HighGPUProcess.objects.all().order_by('-updated_at'), } return render(request, 'high_resource_list.html', context) def find_python_processes_in_dir(directory): """查找指定目录下运行的 Python 进程""" python_processes = [] directory = str(Path(directory).resolve()) # 转换为绝对路径,并处理跨平台差异 for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cwd']): try: # 检查是否是 Python 进程 if proc.info['name'].lower().startswith('python'): # 检查进程的工作目录 proc_cwd = str(Path(proc.info['cwd']).resolve()) if proc_cwd.startswith(directory): # 检查命令行参数 cmdline = proc.info['cmdline'] if cmdline: python_processes.append({ 'pid': proc.info['pid'], 'name': proc.info['name'], 'cmdline': ' '.join(cmdline), 'cwd': proc_cwd }) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass return python_processes def get_gpu_info(pid): """获取进程的GPU使用信息""" try: import pynvml pynvml.nvmlInit() gpu_info = { 'usage': 0, 'memory': 0, 'device_count': pynvml.nvmlDeviceGetCount() } for i in range(gpu_info['device_count']): handle = pynvml.nvmlDeviceGetHandleByIndex(i) procs = pynvml.nvmlDeviceGetComputeRunningProcesses(handle) for proc in procs: if proc.pid == pid: gpu_info['memory'] = proc.usedGpuMemory / 1024 / 1024 # 转换为MB gpu_info['usage'] = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu break return gpu_info except Exception as e: logger.error(f"获取GPU信息失败: {str(e)}") return {'usage': 0, 'memory': 0, 'device_count': 0} def get_io_counters(proc): """获取进程的IO计数器信息""" try: io = proc.io_counters() return { 'read_bytes': io.read_bytes / 1024 / 1024, # 转换为MB 'write_bytes': io.write_bytes / 1024 / 1024, 'read_count': io.read_count, 'write_count': io.write_count } except: return {'read_bytes': 0, 'write_bytes': 0, 'read_count': 0, 'write_count': 0} def get_network_connections(proc): """获取进程的网络连接信息""" try: connections = [] for conn in proc.connections(): connections.append({ 'local_address': f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "", 'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else "", 'status': conn.status }) return connections except: return [] def get_open_files(proc): """获取进程打开的文件列表""" try: return [f.path for f in proc.open_files()] except: return [] @csrf_exempt @require_http_methods(["POST"]) def fetch_tiktok_videos(request): """获取TikTok用户视频并下载播放量前十的视频""" try: data = json.loads(request.body) unique_id = data.get('unique_id') if not unique_id: return JsonResponse({ 'status': 'error', 'message': '请提供TikTok用户ID(unique_id)' }, json_dumps_params={'ensure_ascii': False}) # 调用API获取用户资料和secUid logger.info(f"正在获取用户 {unique_id} 的资料...") user_profile = fetch_user_profile(unique_id) if not user_profile or 'data' not in user_profile: return JsonResponse({ 'status': 'error', 'message': f'无法获取用户 {unique_id} 的资料' }, json_dumps_params={'ensure_ascii': False}) # 从API响应中提取secUid和其他用户信息 try: user_info = user_profile['data']['userInfo']['user'] sec_uid = user_info['secUid'] # 提取其他用户信息 nickname = user_info.get('nickname', f'用户_{unique_id}') signature = user_info.get('signature', '') avatar_url = user_info.get('avatarLarger', '') user_stats = user_profile['data']['userInfo'].get('stats', {}) follower_count = user_stats.get('followerCount', 0) heart_count = user_stats.get('heartCount', 0) logger.info(f"成功获取用户secUid: {sec_uid}") except (KeyError, TypeError) as e: logger.error(f"解析用户资料出错: {e}") return JsonResponse({ 'status': 'error', 'message': f'解析用户资料出错: {str(e)}' }, json_dumps_params={'ensure_ascii': False}) # 确保用户目录存在 user_dir = os.path.join(TIKTOK_VIDEOS_PATH, unique_id) os.makedirs(user_dir, exist_ok=True) # 获取用户视频 videos_data = fetch_user_videos(sec_uid) # 提取视频信息并按播放量排序 all_videos = [] if videos_data and isinstance(videos_data, dict) and 'data' in videos_data and 'itemList' in videos_data['data']: for video in videos_data['data']['itemList']: try: # 确保提取正确的视频ID video_id = video.get('id', '') if not video_id or not str(video_id).isdigit(): logger.warning(f"跳过无效的视频ID: {video_id}") continue # 安全地获取统计数据 stats = video.get('stats', {}) if not isinstance(stats, dict): stats = {} play_count = int(stats.get('playCount', 0)) # 收集视频信息 all_videos.append({ 'id': video_id, 'desc': video.get('desc', ''), 'create_time': video.get('createTime', 0), 'cover': video.get('cover', ''), 'play_count': play_count, 'comment_count': int(stats.get('commentCount', 0)), 'digg_count': int(stats.get('diggCount', 0)), 'share_count': int(stats.get('shareCount', 0)) }) except Exception as e: logger.error(f"处理视频数据出错: {str(e)}") continue # 按播放量排序并获取前10个 all_videos.sort(key=lambda x: x['play_count'], reverse=True) top_videos = all_videos[:10] # 下载视频 downloaded_videos = [] for i, video in enumerate(top_videos): # 获取视频ID video_id = video['id'] # 这是数字ID # 构建保存路径 save_path = os.path.join(user_dir, f"{video_id}.mp4") logger.info(f"开始下载第 {i+1} 个热门视频(ID: {video_id}): {video['desc'][:20]}...") # 使用正确的ID调用下载函数 if download_video(video_id, unique_id, save_path): video['download_path'] = save_path downloaded_videos.append(video) logger.info(f"下载成功: {save_path}") else: logger.error(f"下载失败: {video_id}") # 避免频繁请求被限制 time.sleep(1) # 保存用户信息和视频信息到数据库 video_info_json = json.dumps([{ 'id': v['id'], 'desc': v['desc'], 'play_count': v['play_count'] } for v in downloaded_videos], ensure_ascii=False) user_record = TiktokUserVideos.objects.update_or_create( sec_user_id=sec_uid, defaults={ 'nickname': nickname, 'signature': signature, 'follower_count': follower_count, 'total_favorited': heart_count, 'avatar_url': avatar_url, 'videos_folder': user_dir, 'video_paths': video_info_json } ) return JsonResponse({ 'status': 'success', 'message': '处理完成', 'user_info': { 'nickname': nickname, 'unique_id': unique_id, 'sec_uid': sec_uid, 'avatar': avatar_url, 'follower_count': follower_count, 'total_favorited': heart_count, 'signature': signature }, 'videos_count': len(videos_data['data']['itemList']) if videos_data and 'data' in videos_data and 'itemList' in videos_data['data'] else 0, 'downloaded_videos': len(downloaded_videos), 'videos': [{'id': v['id'], 'desc': v['desc'][:50], 'play_count': v['play_count']} for v in downloaded_videos], 'video_directory': user_dir # 添加视频目录路径方便查找 }, json_dumps_params={'ensure_ascii': False}) except Exception as e: logger.error(f"处理TikTok视频失败: {e}") import traceback logger.error(f"详细错误: {traceback.format_exc()}") return JsonResponse({ 'status': 'error', 'message': f'处理TikTok视频失败: {str(e)}' }, json_dumps_params={'ensure_ascii': False}) @require_http_methods(["GET"]) def get_tiktok_user_videos(request): """获取已下载的TikTok用户视频列表""" try: sec_user_id = request.GET.get('sec_user_id') if not sec_user_id: # 如果没有指定用户ID,返回所有用户列表 users = TiktokUserVideos.objects.all().values('sec_user_id', 'nickname', 'follower_count', 'videos_folder', 'create_time') return JsonResponse({ 'status': 'success', 'users': list(users) }, json_dumps_params={'ensure_ascii': False}) # 查询指定用户信息 try: user = TiktokUserVideos.objects.get(sec_user_id=sec_user_id) # 解析视频信息JSON video_info = json.loads(user.video_paths) if user.video_paths else [] # 获取文件夹中的文件列表 videos_folder = user.videos_folder video_files = [] if os.path.exists(videos_folder): video_files = [f for f in os.listdir(videos_folder) if os.path.isfile(os.path.join(videos_folder, f))] except TiktokUserVideos.DoesNotExist: return JsonResponse({ 'status': 'error', 'message': f'用户 {sec_user_id} 不存在' }, json_dumps_params={'ensure_ascii': False}) return JsonResponse({ 'status': 'success', 'user_info': { 'sec_user_id': user.sec_user_id, 'nickname': user.nickname, 'signature': user.signature, 'follower_count': user.follower_count, 'total_favorited': user.total_favorited, 'avatar_url': user.avatar_url, 'create_time': user.create_time.strftime('%Y-%m-%d %H:%M:%S'), 'update_time': user.update_time.strftime('%Y-%m-%d %H:%M:%S') }, 'videos_folder': videos_folder, 'video_files': video_files, 'video_info': video_info }, json_dumps_params={'ensure_ascii': False}) except Exception as e: logger.error(f"获取TikTok视频列表失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': f'获取TikTok视频列表失败: {str(e)}' }, json_dumps_params={'ensure_ascii': False}) # 辅助函数 def fetch_user_videos(sec_uid, max_cursor=0, count=20): """获取用户视频列表""" url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_post?secUid={sec_uid}" try: response = requests.get(url) if response.status_code == 200: return response.json() else: logger.error(f"获取视频列表失败: {response.status_code}") return None except Exception as e: logger.error(f"获取视频列表异常: {e}") return None def fetch_user_profile(unique_id): """获取用户基本信息""" # 添加User-Agent头,模拟浏览器访问 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Referer': 'https://www.tiktok.com/' } url = f"{API_BASE_URL}/api/tiktok/web/fetch_user_profile?uniqueId={unique_id}" try: logger.info(f"正在请求用户资料: {url}") # 添加重试机制 for attempt in range(3): try: response = requests.get(url, headers=headers, timeout=30) break except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: logger.warning(f"请求超时或连接错误,尝试第{attempt+1}次重试: {e}") if attempt == 2: # 最后一次重试失败 raise time.sleep(2) # 等待2秒后重试 if response.status_code == 200: try: data = response.json() logger.info(f"获取用户资料成功: {unique_id}") # 打印完整响应以便调试 logger.info(f"API原始响应: {data}") # 验证数据完整性 if 'data' not in data or not data['data']: logger.error(f"API响应缺少data字段: {data}") return None if 'userInfo' not in data['data'] or not data['data']['userInfo']: logger.error(f"API响应缺少userInfo字段: {data['data']}") return None if 'user' not in data['data']['userInfo'] or not data['data']['userInfo']['user']: logger.error(f"API响应缺少user字段: {data['data']['userInfo']}") return None # 打印用户信息 logger.info(f"用户信息: {data['data']['userInfo']['user']}") return data except json.JSONDecodeError: logger.error(f"API响应不是有效的JSON: {response.text[:500]}") return None else: logger.error(f"获取用户信息失败: HTTP {response.status_code}, 响应: {response.text[:500]}") return None except Exception as e: logger.error(f"获取用户信息异常: {e}") return None def download_video(video_id, unique_id, save_path): """使用API的直接下载接口下载TikTok视频""" # 确保视频ID是纯数字 if not str(video_id).isdigit(): logger.error(f"无效的视频ID: {video_id},必须是纯数字") return False # 构建标准TikTok视频URL tiktok_url = f"https://www.tiktok.com/@{unique_id}/video/{video_id}" logger.info(f"构建的TikTok URL: {tiktok_url}") # 构建完整的API请求URL api_url = f"http://81.69.223.133:45268/api/download" full_url = f"{api_url}?url={tiktok_url}&prefix=true&with_watermark=false" logger.info(f"完整的API请求URL: {full_url}") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', } try: # 直接使用完整URL发送请求 response = requests.get(full_url, headers=headers, stream=True, timeout=60) # 检查响应状态 if response.status_code != 200: logger.error(f"下载视频失败: {response.status_code} - {response.text[:200] if response.text else '无响应内容'}") return False # 获取内容类型 content_type = response.headers.get('Content-Type', '') logger.info(f"响应内容类型: {content_type}") # 保存文件 with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) file_size = os.path.getsize(save_path) logger.info(f"视频已下载到: {save_path},文件大小: {file_size}字节") return True except Exception as e: logger.error(f"下载视频异常: {e}") import traceback logger.error(f"详细错误: {traceback.format_exc()}") return False