automated_task_monitor/monitor/views.py

1015 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.http import JsonResponse
from .tasks import monitor_process, get_process_gpu_usage
import threading
import psutil
from .models import HighCPUProcess, HighGPUProcess, HighMemoryProcess, AllResourceProcess
import logging
import os
from datetime import datetime
import time
import nvidia_smi
from django.utils import timezone
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render
import pytz
from pathlib import Path # 使用 pathlib 处理跨平台路径
import json
from django.conf import settings
import threading
directory_monitoring = {}
# 全局变量来控制检测线程
monitor_thread = None
is_monitoring = False
# 在文件开头定义日志目录
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(BASE_DIR, 'logs', 'process_monitor')
# 确保基础目录结构存在,添加 'all' 目录
for resource_type in ['cpu', 'memory', 'gpu', 'all']:
os.makedirs(os.path.join(LOG_DIR, resource_type), exist_ok=True)
# 获取应用专属的logger
logger = logging.getLogger('monitor')
# 全局变量来跟踪监控的目录
monitored_directories = set()
# 添加新的监控线程函数
def monitor_directory(directory):
"""持续监控目录的后台任务"""
monitor_interval = settings.MONITOR_INTERVAL # 监控间隔5秒
log_interval = settings.LOG_INTERVAL # 日志写入间隔60秒
processed_pids = set() # 用于跟踪已处理的PID
last_log_time = {} # 每个进程的最后日志时间
while directory_monitoring.get(directory, False):
try:
next_update = timezone.now() + timezone.timedelta(seconds=monitor_interval)
processes = find_python_processes_in_dir(directory)
for proc in processes:
pid = proc['pid']
try:
# 设置日志文件
log_file = setup_process_logger(pid, 'all')
if log_file:
process = psutil.Process(pid)
data = get_process_data(process)
# 更新数据库
AllResourceProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': float(data['cpu']['usage'].replace('%', '')),
'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')),
'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')),
'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()),
'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()),
'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()),
'gpu_usage': float(data['gpu']['usage'].replace('%', '')),
'gpu_memory': float(data['gpu']['memory'].replace('MB', '')),
'net_io_sent': float(data['io']['write'].split('MB')[0].strip()),
'net_io_recv': float(data['io']['read'].split('MB')[0].strip()),
'is_active': True,
'status': 1,
'log_file': log_file
}
)
now = timezone.now()
# 如果是新检测到的进程,立即写入日志
if pid not in processed_pids:
print(f"首次检测到进程,立即写入日志: PID={pid}") # 调试日志
log_process_metrics(pid, data, log_file)
processed_pids.add(pid)
last_log_time[pid] = now
# 对于已知进程,每隔一分钟写入一次
elif (now - last_log_time.get(pid, now)).total_seconds() >= log_interval:
print(f"定期写入日志: PID={pid}") # 调试日志
log_process_metrics(pid, data, log_file)
last_log_time[pid] = now
except Exception as e:
logger.error(f"监控进程 {pid} 失败: {str(e)}")
# 计算需要等待的时间
now = timezone.now()
if next_update > now:
time.sleep((next_update - now).total_seconds())
except Exception as e:
logger.error(f"目录监控出错: {str(e)}")
time.sleep(5)
def get_python_processes(directory):
"""获取指定目录下的Python进程"""
directory = str(Path(directory).resolve()).lower()
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if 'python' in proc.info['name'].lower():
process = psutil.Process(proc.info['pid'])
try:
proc_cwd = str(Path(process.cwd()).resolve()).lower()
if directory in proc_cwd or proc_cwd.startswith(directory):
processes.append(process)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return processes
def get_process_info(process):
"""获取进程详细信息"""
try:
with process.oneshot():
info = {
'pid': process.pid,
'name': process.name(),
'cpu_usage': process.cpu_percent(),
'memory_usage': process.memory_info().rss / (1024 * 1024), # MB
'command_line': ' '.join(process.cmdline()),
'create_time': process.create_time(),
'status': process.status(),
'threads': process.num_threads(),
'working_directory': process.cwd(),
'gpu_info': {
'usage': 0,
'memory': 0
}
}
# 获取IO信息
try:
io = process.io_counters()
info['io'] = {
'read_bytes': io.read_bytes / (1024 * 1024), # MB
'write_bytes': io.write_bytes / (1024 * 1024), # MB
'read_count': io.read_count,
'write_count': io.write_count
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
info['io'] = {
'read_bytes': 0,
'write_bytes': 0,
'read_count': 0,
'write_count': 0
}
return info
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.error(f"获取进程 {process.pid} 信息失败: {str(e)}")
return None
@csrf_exempt
@require_http_methods(["POST"])
def scan_directory(request):
"""扫描目录下的 Python 进程"""
try:
data = json.loads(request.body)
directory = data.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '请提供目录路径'
})
# 添加到监控目录集合
directory = str(Path(directory).resolve())
monitored_directories.add(directory)
logger.info(f"开始监控目录: {directory}")
return JsonResponse({
'status': 'success',
'message': f'开始监控目录: {directory}',
'directory': directory
})
except Exception as e:
logger.error(f"启动目录监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
@require_http_methods(["GET"])
def get_directory_status(request):
"""获取目录下的进程状态"""
try:
directory = request.GET.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '未提供目录路径'
})
# 获取目录下的Python进程
processes = get_python_processes(directory)
# 获取每个进程的详细信息
process_info = []
for proc in processes:
info = get_process_info(proc)
if info:
process_info.append(info)
return JsonResponse({
'status': 'success',
'processes': process_info,
'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
logger.error(f"获取目录状态失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
@require_http_methods(["POST"])
def stop_directory_monitor(request):
"""停止目录监控"""
try:
data = json.loads(request.body)
directory = data.get('directory')
if not directory:
return JsonResponse({
'status': 'error',
'message': '未提供目录路径'
})
# 从监控集合中移除
directory = str(Path(directory).resolve())
if directory in monitored_directories:
monitored_directories.remove(directory)
logger.info(f"停止监控目录: {directory}")
return JsonResponse({
'status': 'success',
'message': '监控已停止'
})
except Exception as e:
logger.error(f"停止监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': str(e)
})
def setup_process_logger(pid, monitor_type):
"""为进程设置独立的日志记录器"""
try:
# 验证监控类型
if monitor_type not in ['cpu', 'memory', 'gpu', 'all']:
print(f"警告:无效的监控类型 {monitor_type}")
return None
# 获取当前时间(使用本地时区)
local_tz = pytz.timezone('Asia/Shanghai')
current_time = timezone.localtime(timezone.now(), local_tz)
current_date = current_time.strftime("%Y%m%d")
# 如果是从目录扫描启动的监控,使用 'all' 类型
if monitor_type == 'scan':
monitor_type = 'all'
# 构建日志文件路径
log_dir = os.path.join(LOG_DIR, monitor_type)
log_file = os.path.join(log_dir, f'{pid}_{current_date}_{monitor_type}.log')
# 确保目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
print(f"设置日志文件: {log_file}") # 调试信息
return log_file
except Exception as e:
print(f"设置日志文件失败: {str(e)}")
raise
def log_process_metrics(pid, data, log_file):
"""记录进程指标到日志文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 获取当前时间(使用本地时区)
local_tz = pytz.timezone('Asia/Shanghai') # 使用中国时区
current_time = timezone.localtime(timezone.now(), local_tz)
timestamp = current_time.strftime('%Y-%m-%d %H:%M:%S')
# 格式化日志内容
log_content = f"=== {timestamp} ===\n"
# CPU信息
log_content += "CPU信息:\n"
log_content += f"├─ 使用率: {data['cpu']['usage']}\n"
log_content += f"├─ 用户态时间: {data['cpu']['user_time']}\n"
log_content += f"├─ 内核态时间: {data['cpu']['system_time']}\n"
log_content += f"├─ CPU核心数: {data['cpu']['cores']}\n"
log_content += f"├─ CPU频率: {data['cpu']['frequency']}\n"
log_content += f"└─ 上下文切换: {data['cpu']['context_switches']}\n"
# 内存信息
log_content += "内存信息:\n"
log_content += f"├─ 物理内存: {data['memory']['physical']}\n"
log_content += f"├─ 虚拟内存: {data['memory']['virtual']}\n"
log_content += f"├─ 内存映射: {data['memory']['mappings']}\n"
log_content += f"├─ 系统内存使用: {data['memory']['system_usage']}\n"
log_content += f"└─ 交换空间使用: {data['memory']['swap_usage']}\n"
# GPU信息
log_content += "GPU信息:\n"
log_content += f"├─ 使用率: {data['gpu']['usage']}\n"
log_content += f"└─ 显存使用: {data['gpu']['memory']}\n"
# IO信息
log_content += "IO信息:\n"
log_content += f"├─ 读取: {data['io']['read']}\n"
log_content += f"├─ 写入: {data['io']['write']}\n"
log_content += f"└─ 其他IO: {data['io']['other']}\n"
# 进程状态
log_content += f"进程状态: {data['status']}\n"
log_content += "-" * 50 + "\n"
# 写入日志文件
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_content)
print(f"已写入日志到: {log_file}") # 调试信息
except Exception as e:
print(f"写入日志失败: {str(e)}")
raise
def get_process_by_name(process_name):
"""根据进程名称获取进程PID"""
pids = []
for proc in psutil.process_iter(['pid', 'name']):
try:
if process_name.lower() in proc.info['name'].lower():
pids.append(proc.info['pid'])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pids
def get_process_gpu_usage(pid):
"""获取进程的GPU使用情况"""
try:
# 如果没有GPU或无法获取GPU信息返回默认值
return 0.0, 0.0
except Exception as e:
logger.error(f"获取GPU信息失败: {str(e)}")
return 0.0, 0.0
def get_high_resource_processes():
"""获取高资源占用的进程"""
high_resource_pids = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
if proc.info['cpu_percent'] > 50 or proc.info['memory_percent'] > 50:
high_resource_pids.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cpu_percent': proc.info['cpu_percent'],
'memory_percent': proc.info['memory_percent']
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return high_resource_pids
def auto_detect_high_resource_processes(request):
"""自动检测高资源进程的API端点"""
try:
# 获取高资源进程
high_resource_procs = get_high_resource_processes()
# 启动监控
for proc in high_resource_procs:
try:
process = psutil.Process(proc['pid'])
# 设置日志文件
log_file = setup_process_logger(proc['pid'], 'auto')
# 记录进程数据
data = get_process_data(process)
log_process_metrics(proc['pid'], data, log_file)
except psutil.NoSuchProcess:
continue
except Exception as e:
logger.error(f"监控进程 {proc['pid']} 时出错: {str(e)}")
return JsonResponse({
'status': 'success',
'message': '已开始自动检测高资源进程',
'processes': high_resource_procs
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"自动检测失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'自动检测失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
def setup_logger(pid):
"""为每个进程设置独立的日志记录器"""
log_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs', 'process_monitor', f'process_{pid}_{datetime.now().strftime("%Y%m%d")}.log')
process_logger = logging.getLogger(f'monitor.process.process_{pid}')
process_logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
process_logger.addHandler(handler)
return process_logger, log_file
def continuous_monitor():
"""持续监控高资源进程的线程函数"""
global is_monitoring
logger = logging.getLogger('monitor')
interval = 60 # 设置为60秒间隔
while is_monitoring:
try:
next_update = timezone.now() + timezone.timedelta(seconds=interval)
# 获取所有活跃的监控记录
monitors = {
'cpu': HighCPUProcess.objects.filter(is_active=True),
'memory': HighMemoryProcess.objects.filter(is_active=True),
'gpu': HighGPUProcess.objects.filter(is_active=True)
}
for monitor_type, processes in monitors.items():
for proc in processes:
try:
process = psutil.Process(proc.pid)
data = get_process_data(process)
log_file = setup_process_logger(proc.pid, monitor_type)
log_process_metrics(proc.pid, data, log_file)
except psutil.NoSuchProcess:
proc.is_active = False
proc.save()
except Exception as e:
logger.error(f"监控进程 {proc.pid} 时出错: {str(e)}")
# 计算需要等待的时间
now = timezone.now()
if next_update > now:
time.sleep((next_update - now).total_seconds())
except Exception as e:
logger.error(f"持续监控出错: {str(e)}")
time.sleep(5)
def get_process_data(process):
"""获取进程的详细数据"""
with process.oneshot():
cpu_times = process.cpu_times()
cpu_freq = psutil.cpu_freq() or psutil._common.scpufreq(current=0, min=0, max=0)
cpu_ctx = process.num_ctx_switches()
mem = process.memory_info()
try:
mem_maps = len(process.memory_maps())
except (psutil.AccessDenied, psutil.TimeoutExpired):
mem_maps = 0
sys_mem = psutil.virtual_memory()
swap = psutil.swap_memory()
try:
io = process.io_counters()
except (psutil.AccessDenied, psutil.TimeoutExpired):
io = psutil._common.pio(read_count=0, write_count=0, read_bytes=0, write_bytes=0,
read_chars=0, write_chars=0, other_count=0, other_bytes=0)
gpu_usage, gpu_memory = get_process_gpu_usage(process.pid)
return {
'timestamp': timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
'cpu': {
'usage': f"{process.cpu_percent()}%",
'user_time': f"{cpu_times.user:.1f}s",
'system_time': f"{cpu_times.system:.1f}s",
'cores': str(psutil.cpu_count()),
'frequency': f"{getattr(cpu_freq, 'current', 0):.1f}MHz",
'context_switches': f"{cpu_ctx.voluntary}/{cpu_ctx.involuntary}"
},
'memory': {
'physical': f"{mem.rss/1024/1024:.1f}MB ({process.memory_percent():.1f}%)",
'virtual': f"{mem.vms/1024/1024:.1f}MB",
'mappings': f"{mem_maps}",
'system_usage': f"{sys_mem.percent:.1f}%",
'swap_usage': f"{swap.percent:.1f}%"
},
'gpu': {
'usage': f"{gpu_usage:.1f}%",
'memory': f"{gpu_memory:.1f}MB"
},
'io': {
'read': f"{getattr(io, 'read_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'read_count', 0)}次)",
'write': f"{getattr(io, 'write_bytes', 0)/1024/1024:.1f}MB ({getattr(io, 'write_count', 0)}次)",
'other': f"{getattr(io, 'other_count', 0)}"
},
'status': process.status()
}
@csrf_exempt
@require_http_methods(["GET", "POST"])
def start_monitor(request):
"""开始监控进程"""
global monitor_thread, is_monitoring
try:
pid = request.GET.get('pid')
monitor_type = request.GET.get('type')
if not pid:
return JsonResponse({
'status': 'error',
'message': '请输入进程ID'
}, json_dumps_params={'ensure_ascii': False})
pid = int(pid)
try:
process = psutil.Process(pid)
process_name = process.name()
# 启动监控线程
if not is_monitoring:
is_monitoring = True
monitor_thread = threading.Thread(target=continuous_monitor)
monitor_thread.daemon = True
monitor_thread.start()
return JsonResponse({
'status': 'success',
'message': f'开始监控进程 {pid} ({process_name})'
}, json_dumps_params={'ensure_ascii': False})
except psutil.NoSuchProcess:
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 不存在'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"启动监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'启动监控失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET", "POST"])
def stop_monitor(request):
"""停止监控进程"""
global is_monitoring
try:
pid = request.GET.get('pid')
if not pid:
return JsonResponse({
'status': 'error',
'message': '请输入进程ID'
}, json_dumps_params={'ensure_ascii': False})
pid = int(pid)
# 停止监控线程
is_monitoring = False
return JsonResponse({
'status': 'success',
'message': f'停止监控进程 {pid}'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"停止监控失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'停止监控失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@require_http_methods(["GET"])
def get_process_status(request, pid):
"""获取进程监控状态的API"""
try:
monitor_type = request.GET.get('type')
logger.info(f"获取进程状态: pid={pid}, type={monitor_type}")
# 先设置日志文件路径
log_file = setup_process_logger(pid, monitor_type)
if not log_file:
raise ValueError(f"无法创建日志文件,监控类型: {monitor_type}")
process = psutil.Process(pid)
data = get_process_data(process)
# 同步数据到数据库
try:
# 更新 all 资源记录
AllResourceProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': float(data['cpu']['usage'].replace('%', '')),
'cpu_user_time': float(data['cpu']['user_time'].replace('s', '')),
'cpu_system_time': float(data['cpu']['system_time'].replace('s', '')),
'memory_usage': float(data['memory']['physical'].split('MB')[0].strip()),
'memory_percent': float(data['memory']['physical'].split('(')[1].split('%')[0].strip()),
'virtual_memory': float(data['memory']['virtual'].split('MB')[0].strip()),
'gpu_usage': float(data['gpu']['usage'].replace('%', '')),
'gpu_memory': float(data['gpu']['memory'].replace('MB', '')),
'net_io_sent': float(data['io']['write'].split('MB')[0].strip()),
'net_io_recv': float(data['io']['read'].split('MB')[0].strip()),
'is_active': True,
'status': 1,
'log_file': log_file
}
)
# 根据监控类型选择对应的模型
if monitor_type == 'cpu':
# 从字符串中提取CPU使用率数值
cpu_usage = float(data['cpu']['usage'].replace('%', ''))
logger.info(f"CPU使用率: {cpu_usage}%") # 调试日志
process_obj, created = HighCPUProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'cpu_usage': cpu_usage,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
logger.info(f"{'创建' if created else '更新'}CPU进程记录: {process_obj}")
elif monitor_type == 'memory':
# 从字符串中提取内存使用量和百分比
memory_info = data['memory']['physical']
memory_usage = float(memory_info.split('MB')[0].strip())
memory_percent = float(memory_info.split('(')[1].split('%')[0].strip())
HighMemoryProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'memory_usage': memory_usage,
'memory_percent': memory_percent,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
elif monitor_type == 'gpu':
# 从字符串中提取GPU使用率和显存
gpu_usage = float(data['gpu']['usage'].replace('%', ''))
gpu_memory = float(data['gpu']['memory'].replace('MB', ''))
HighGPUProcess.objects.update_or_create(
pid=pid,
defaults={
'process_name': process.name(),
'gpu_usage': gpu_usage,
'gpu_memory': gpu_memory,
'is_active': True,
'status': 1,
'log_file': log_file
}
)
return JsonResponse({
'status': 'success',
'data': data
})
except psutil.NoSuchProcess:
# 进程已终止
monitor.is_active = False
monitor.status = 0
monitor.save()
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 已终止'
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
})
def update_process_status(pid, monitor_type, is_active=False, status=0):
"""更新进程状态"""
try:
if monitor_type == 'cpu':
HighCPUProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
elif monitor_type == 'memory':
HighMemoryProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
elif monitor_type == 'gpu':
HighGPUProcess.objects.filter(pid=pid).update(
is_active=is_active,
status=status
)
except Exception as e:
logger.error(f"更新进程状态失败: {str(e)}")
def index(request):
"""渲染主页"""
return render(request, 'index.html')
@csrf_exempt
@require_http_methods(["POST"])
def stop_auto_detect(request):
"""停止自动检测的API端点"""
try:
return JsonResponse({
'status': 'success',
'message': '已停止自动检测'
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
logger.error(f"停止自动检测失败: {str(e)}")
return JsonResponse({
'status': 'error',
'message': f'停止自动检测失败: {str(e)}'
}, json_dumps_params={'ensure_ascii': False})
@csrf_exempt
@require_http_methods(["GET"])
def get_latest_log(request, pid):
"""获取最新的监控日志"""
try:
monitor_type = request.GET.get('type') # 获取监控类型cpu/gpu/memory
if not monitor_type:
return JsonResponse({
'status': 'error',
'message': '请指定监控类型'
})
# 根据类型获取对应的监控记录
monitor = None
if monitor_type == 'cpu':
monitor = HighCPUProcess.objects.filter(pid=pid, is_active=True).first()
elif monitor_type == 'gpu':
monitor = HighGPUProcess.objects.filter(pid=pid, is_active=True).first()
elif monitor_type == 'memory':
monitor = HighMemoryProcess.objects.filter(pid=pid, is_active=True).first()
if not monitor:
return JsonResponse({
'status': 'error',
'message': f'未找到进程 {pid}{monitor_type}监控记录'
})
# 获取最新数据
try:
process = psutil.Process(pid)
with process.oneshot():
data = {
'pid': pid,
'name': process.name(),
'status': 1, # 1表示进程运行中
}
# 根据监控类型添加相应的数据
if monitor_type == 'cpu':
data['cpu_percent'] = process.cpu_percent()
elif monitor_type == 'memory':
memory_info = process.memory_info()
data['memory_gb'] = memory_info.rss / (1024 * 1024 * 1024)
data['memory_percent'] = process.memory_percent()
elif monitor_type == 'gpu':
gpu_usage, gpu_memory = get_process_gpu_usage(pid)
data['gpu_usage'] = gpu_usage
data['gpu_memory'] = gpu_memory
return JsonResponse({
'status': 'success',
'data': data
})
except psutil.NoSuchProcess:
# 进程已终止
monitor.is_active = False
monitor.status = 0
monitor.save()
return JsonResponse({
'status': 'error',
'message': f'进程 {pid} 已终止'
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
})
@csrf_exempt
@require_http_methods(["GET"])
def get_process_list(request):
"""获取当前监控的进程列表"""
try:
processes = []
# 获取所有活跃的监控进程
cpu_processes = HighCPUProcess.objects.filter(is_active=True)
gpu_processes = HighGPUProcess.objects.filter(is_active=True)
memory_processes = HighMemoryProcess.objects.filter(is_active=True)
# 创建进程信息字典
process_dict = {}
# 处理 CPU 进程
for proc in cpu_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': proc.cpu_usage,
'memory_gb': 0,
'memory_percent': 0,
'gpu_usage': 0,
'gpu_memory': 0,
'resource_types': ['cpu']
}
else:
process_dict[proc.pid]['cpu_percent'] = proc.cpu_usage
process_dict[proc.pid]['resource_types'].append('cpu')
# 处理 GPU 进程
for proc in gpu_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': 0,
'memory_gb': 0,
'memory_percent': 0,
'gpu_usage': proc.gpu_usage,
'gpu_memory': proc.gpu_memory,
'resource_types': ['gpu']
}
else:
process_dict[proc.pid]['gpu_usage'] = proc.gpu_usage
process_dict[proc.pid]['gpu_memory'] = proc.gpu_memory
process_dict[proc.pid]['resource_types'].append('gpu')
# 处理内存进程
for proc in memory_processes:
if proc.pid not in process_dict:
process_dict[proc.pid] = {
'pid': proc.pid,
'name': proc.process_name,
'cpu_percent': 0,
'memory_gb': proc.memory_usage,
'memory_percent': proc.memory_percent,
'gpu_usage': 0,
'gpu_memory': 0,
'resource_types': ['memory']
}
else:
process_dict[proc.pid]['memory_gb'] = proc.memory_usage
process_dict[proc.pid]['memory_percent'] = proc.memory_percent
process_dict[proc.pid]['resource_types'].append('memory')
# 转换字典为列表
processes = list(process_dict.values())
# 按总资源占用率排序
processes.sort(key=lambda x: (
x['cpu_percent'] / 100 +
x['memory_percent'] / 100 +
x['gpu_usage'] / 100
), reverse=True)
return JsonResponse({
'status': 'success',
'processes': processes
}, json_dumps_params={'ensure_ascii': False})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
}, json_dumps_params={'ensure_ascii': False})
def high_resource_list(request):
"""高资源进程列表视图"""
context = {
'cpu_processes': HighCPUProcess.objects.all().order_by('-updated_at'),
'memory_processes': HighMemoryProcess.objects.all().order_by('-updated_at'),
'gpu_processes': HighGPUProcess.objects.all().order_by('-updated_at'),
}
return render(request, 'high_resource_list.html', context)
def find_python_processes_in_dir(directory):
"""查找指定目录下运行的 Python 进程"""
python_processes = []
directory = str(Path(directory).resolve()) # 转换为绝对路径,并处理跨平台差异
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cwd']):
try:
# 检查是否是 Python 进程
if proc.info['name'].lower().startswith('python'):
# 检查进程的工作目录
proc_cwd = str(Path(proc.info['cwd']).resolve())
if proc_cwd.startswith(directory):
# 检查命令行参数
cmdline = proc.info['cmdline']
if cmdline:
python_processes.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cmdline': ' '.join(cmdline),
'cwd': proc_cwd
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return python_processes
def get_gpu_info(pid):
"""获取进程的GPU使用信息"""
try:
import pynvml
pynvml.nvmlInit()
gpu_info = {
'usage': 0,
'memory': 0,
'device_count': pynvml.nvmlDeviceGetCount()
}
for i in range(gpu_info['device_count']):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
procs = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)
for proc in procs:
if proc.pid == pid:
gpu_info['memory'] = proc.usedGpuMemory / 1024 / 1024 # 转换为MB
gpu_info['usage'] = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu
break
return gpu_info
except Exception as e:
logger.error(f"获取GPU信息失败: {str(e)}")
return {'usage': 0, 'memory': 0, 'device_count': 0}
def get_io_counters(proc):
"""获取进程的IO计数器信息"""
try:
io = proc.io_counters()
return {
'read_bytes': io.read_bytes / 1024 / 1024, # 转换为MB
'write_bytes': io.write_bytes / 1024 / 1024,
'read_count': io.read_count,
'write_count': io.write_count
}
except:
return {'read_bytes': 0, 'write_bytes': 0, 'read_count': 0, 'write_count': 0}
def get_network_connections(proc):
"""获取进程的网络连接信息"""
try:
connections = []
for conn in proc.connections():
connections.append({
'local_address': f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "",
'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else "",
'status': conn.status
})
return connections
except:
return []
def get_open_files(proc):
"""获取进程打开的文件列表"""
try:
return [f.path for f in proc.open_files()]
except:
return []