automated_task_monitor/monitor/views.py

470 lines
18 KiB
Python
Raw Normal View History

2025-02-18 19:40:58 +08:00
from django.http import JsonResponse
from .tasks import monitor_process, get_process_gpu_usage
import threading
import psutil
from .models import HighCPUProcess, HighGPUProcess, HighMemoryProcess
import logging
import os
from datetime import datetime
import time
import nvidia_smi
from django.utils import timezone
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
# 配置日志
LOG_DIR = 'logs/process_monitor'
os.makedirs(LOG_DIR, exist_ok=True)
def setup_logger(pid):
"""为每个进程设置独立的日志记录器"""
log_file = os.path.join(LOG_DIR, f'process_{pid}_{datetime.now().strftime("%Y%m%d")}.log')
logger = logging.getLogger(f'process_{pid}')
logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger, log_file
def get_process_by_name(process_name):
"""根据进程名称获取进程PID"""
pids = []
for proc in psutil.process_iter(['pid', 'name']):
try:
if process_name.lower() in proc.info['name'].lower():
pids.append(proc.info['pid'])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pids
def get_process_gpu_usage(pid):
"""获取进程的GPU使用情况"""
try:
nvidia_smi.nvmlInit()
deviceCount = nvidia_smi.nvmlDeviceGetCount()
gpu_usage = 0
gpu_memory = 0
for i in range(deviceCount):
handle = nvidia_smi.nvmlDeviceGetHandleByIndex(i)
processes = nvidia_smi.nvmlDeviceGetComputeRunningProcesses(handle)
for process in processes:
if process.pid == pid:
gpu_memory = process.usedGpuMemory / 1024 / 1024 # 转换为MB
gpu_usage = nvidia_smi.nvmlDeviceGetUtilizationRates(handle).gpu
return gpu_usage, gpu_memory
return 0, 0
except:
return 0, 0
finally:
try:
nvidia_smi.nvmlShutdown()
except:
pass
def get_high_resource_processes():
"""获取高资源占用的进程"""
high_resource_pids = []
for proc in psutil.process_iter(['pid', 'name']):
try:
# 获取进程信息
process = psutil.Process(proc.info['pid'])
memory_gb = process.memory_info().rss / (1024 * 1024 * 1024) # 转换为GB
# 获取GPU使用情况
gpu_usage, gpu_memory = get_process_gpu_usage(proc.info['pid'])
# 检查是否满足条件GPU使用率>50%
if gpu_usage > 50:
high_resource_pids.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'memory_gb': round(memory_gb, 2),
'gpu_usage': gpu_usage,
'gpu_memory': round(gpu_memory, 2) # GPU显存使用量(MB)
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return high_resource_pids
def auto_detect_high_resource_processes():
"""定期自动检测新的高资源进程"""
while True:
try:
existing_pids = set(ProcessMonitor.objects.filter(is_active=True).values_list('pid', flat=True))
high_resource_procs = get_high_resource_processes()
for proc in high_resource_procs:
if proc['pid'] not in existing_pids:
logger, log_file = setup_logger(proc['pid'])
# 记录到数据库
monitor = ProcessMonitor.objects.create(
pid=proc['pid'],
process_name=proc['name'],
cpu_usage=0,
memory_usage=proc['memory_gb'],
network_usage=0,
log_path=log_file
)
# 启动监控线程
threading.Thread(
target=monitor_process,
args=(proc['pid'], logger)
).start()
print(f"发现新的高资源进程: {proc['name']} (PID: {proc['pid']})")
# 每5分钟检测一次
time.sleep(300)
except Exception as e:
print(f"自动检测出错: {str(e)}")
time.sleep(60) # 出错后等待1分钟再试
def start_monitor(request):
"""开始监控进程"""
pid = request.GET.get('pid')
resource_type = request.GET.get('type', 'all') # cpu, gpu, memory, all
try:
if pid:
pid = int(pid)
process = psutil.Process(pid)
# 检查进程是否已经在监控中
monitors = {
'cpu': HighCPUProcess.objects.filter(pid=pid, is_active=True).exists(),
'gpu': HighGPUProcess.objects.filter(pid=pid, is_active=True).exists(),
'memory': HighMemoryProcess.objects.filter(pid=pid, is_active=True).exists()
}
# 根据资源类型启动监控
results = []
if resource_type == 'all':
for rtype, is_monitored in monitors.items():
if not is_monitored:
thread = threading.Thread(
target=monitor_process,
args=(pid, rtype),
daemon=True
)
thread.start()
results.append(f"已启动{rtype}监控")
else:
results.append(f"{rtype}监控已在运行")
else:
if not monitors.get(resource_type):
thread = threading.Thread(
target=monitor_process,
args=(pid, resource_type),
daemon=True
)
thread.start()
results.append(f"已启动{resource_type}监控")
else:
return JsonResponse({"error": f"进程 {pid} 已在{resource_type}监控中"}, status=400)
return JsonResponse({
"message": f"开始监控进程 {process.name()} (PID: {pid})",
"results": results
})
# 自动检测高资源进程
high_resource_procs = {
'cpu': [],
'gpu': [],
'memory': []
}
for proc in psutil.process_iter(['pid', 'name']):
try:
process = psutil.Process(proc.info['pid'])
# 检查CPU使用率 (>200% 表示使用超过2个核心)
cpu_percent = process.cpu_percent(interval=1.0)
if cpu_percent > 200:
high_resource_procs['cpu'].append(process)
# 检查内存使用量 (>20GB)
memory_gb = process.memory_info().rss / (1024 * 1024 * 1024)
if memory_gb > 20:
high_resource_procs['memory'].append(process)
# 检查GPU使用率 (>50%)
gpu_usage, gpu_memory = get_process_gpu_usage(process.pid)
if gpu_usage > 50:
high_resource_procs['gpu'].append(process)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 启动监控
results = {
'cpu': [],
'gpu': [],
'memory': []
}
for resource_type, processes in high_resource_procs.items():
for proc in processes:
if not any([
HighCPUProcess.objects.filter(pid=proc.pid, is_active=True).exists(),
HighGPUProcess.objects.filter(pid=proc.pid, is_active=True).exists(),
HighMemoryProcess.objects.filter(pid=proc.pid, is_active=True).exists()
]):
thread = threading.Thread(
target=monitor_process,
args=(proc.pid, resource_type),
daemon=True
)
thread.start()
results[resource_type].append({
'pid': proc.pid,
'name': proc.name()
})
return JsonResponse({
"message": "开始监控高资源进程",
"processes": results
})
except Exception as e:
return JsonResponse({"error": str(e)}, status=500)
@csrf_exempt
@require_http_methods(["POST"])
def stop_monitor(request, pid):
"""停止监控指定进程"""
resource_type = request.GET.get('type', 'all') # 从查询参数获取资源类型
try:
# 根据资源类型选择要停止的监控
monitors = []
if resource_type == 'all':
monitors.extend(HighCPUProcess.objects.filter(pid=pid, is_active=True))
monitors.extend(HighGPUProcess.objects.filter(pid=pid, is_active=True))
monitors.extend(HighMemoryProcess.objects.filter(pid=pid, is_active=True))
elif resource_type == 'cpu':
monitors.extend(HighCPUProcess.objects.filter(pid=pid, is_active=True))
elif resource_type == 'gpu':
monitors.extend(HighGPUProcess.objects.filter(pid=pid, is_active=True))
elif resource_type == 'memory':
monitors.extend(HighMemoryProcess.objects.filter(pid=pid, is_active=True))
else:
return JsonResponse({
"error": f"不支持的资源类型: {resource_type}"
}, status=400)
if not monitors:
return JsonResponse({
"error": f"未找到进程 {pid}{resource_type}监控记录"
}, status=404)
# 更新所有监控记录的状态
for monitor in monitors:
# 只更新监控状态,不改变进程状态
monitor.is_active = False
monitor.save()
# 记录停止操作
logger = logging.getLogger(f'{monitor.__class__.__name__.lower()}_{pid}')
logger.info(
f"手动停止监控:\n"
f"├─ 进程ID: {pid}\n"
f"├─ 监控类型: {monitor.__class__.__name__}\n"
f"├─ 进程状态: {'运行中' if monitor.status == 1 else '已终止'}\n"
f"├─ 开始时间: {monitor.created_at}\n"
f"└─ 停止时间: {timezone.now()}"
)
# 尝试终止相关的监控线程
import threading
current_threads = threading.enumerate()
monitor_threads = [t for t in current_threads if t.name.startswith(f'monitor_{pid}')]
for thread in monitor_threads:
try:
thread.do_run = False
except:
pass
return JsonResponse({
"message": f"已停止对进程 {pid} 的监控",
"stopped_monitors": len(monitors),
"process_status": "运行中" if monitors[0].status == 1 else "已终止"
})
except Exception as e:
return JsonResponse({
"error": f"停止监控失败: {str(e)}"
}, status=500)
def get_process_metrics(request, pid):
"""获取进程监控数据"""
resource_type = request.GET.get('type', 'all')
try:
results = {}
monitors = {
'cpu': HighCPUProcess,
'gpu': HighGPUProcess,
'memory': HighMemoryProcess
}
if resource_type == 'all':
for rtype, model in monitors.items():
try:
monitor = model.objects.get(pid=pid)
results[rtype] = {
'status': monitor.status,
'cpu_usage': monitor.cpu_usage,
'memory_usage': monitor.memory_usage,
'gpu_usage': monitor.gpu_usage,
'gpu_memory': monitor.gpu_memory,
'virtual_memory': monitor.virtual_memory
}
# 添加特定资源类型的指标
if rtype == 'cpu':
results[rtype]['cpu_cores'] = monitor.cpu_cores
elif rtype == 'gpu':
results[rtype]['gpu_index'] = monitor.gpu_index
elif rtype == 'memory':
results[rtype]['swap_usage'] = monitor.swap_usage
except model.DoesNotExist:
continue
else:
model = monitors.get(resource_type)
if model:
try:
monitor = model.objects.get(pid=pid)
results[resource_type] = {
'status': monitor.status,
'cpu_usage': monitor.cpu_usage,
'memory_usage': monitor.memory_usage,
'gpu_usage': monitor.gpu_usage,
'gpu_memory': monitor.gpu_memory,
'virtual_memory': monitor.virtual_memory
}
# 添加特定资源类型的指标
if resource_type == 'cpu':
results[resource_type]['cpu_cores'] = monitor.cpu_cores
elif resource_type == 'gpu':
results[resource_type]['gpu_index'] = monitor.gpu_index
elif resource_type == 'memory':
results[resource_type]['swap_usage'] = monitor.swap_usage
except model.DoesNotExist:
pass
if not results:
return JsonResponse({"error": f"未找到PID为{pid}的监控记录"}, status=404)
return JsonResponse({
"pid": pid,
"metrics": results
})
except Exception as e:
return JsonResponse({"error": str(e)}, status=500)
def auto_detect_monitor(request):
"""自动检测并监控高资源进程"""
try:
# 清理已停止的监控
HighCPUProcess.objects.filter(is_active=True, status=0).update(is_active=False)
HighGPUProcess.objects.filter(is_active=True, status=0).update(is_active=False)
HighMemoryProcess.objects.filter(is_active=True, status=0).update(is_active=False)
results = {
'cpu': [],
'gpu': [],
'memory': []
}
# 首先收集所有进程的CPU使用率
processes = {}
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
processes[proc.info['pid']] = proc.info
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 等待一秒获取CPU使用率变化
time.sleep(1)
# 检测高资源进程
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
pid = proc.info['pid']
if pid not in processes:
continue
process = psutil.Process(pid)
cpu_percent = proc.info['cpu_percent']
# 检查CPU使用率 (>200% 表示使用超过2个核心)
if cpu_percent > 200:
if not HighCPUProcess.objects.filter(pid=pid, is_active=True).exists():
thread = threading.Thread(
target=monitor_process,
args=(pid, 'cpu'),
daemon=True
)
thread.start()
results['cpu'].append({
'pid': pid,
'name': process.name(),
'cpu_usage': cpu_percent
})
# 检查内存使用量 (>20GB)
memory_gb = process.memory_info().rss / (1024 * 1024 * 1024)
if memory_gb > 20:
if not HighMemoryProcess.objects.filter(pid=pid, is_active=True).exists():
thread = threading.Thread(
target=monitor_process,
args=(pid, 'memory'),
daemon=True
)
thread.start()
results['memory'].append({
'pid': pid,
'name': process.name(),
'memory_usage': memory_gb
})
# 检查GPU使用率 (>50%)
gpu_usage, gpu_memory = get_process_gpu_usage(pid)
if gpu_usage > 50:
if not HighGPUProcess.objects.filter(pid=pid, is_active=True).exists():
thread = threading.Thread(
target=monitor_process,
args=(pid, 'gpu'),
daemon=True
)
thread.start()
results['gpu'].append({
'pid': pid,
'name': process.name(),
'gpu_usage': gpu_usage,
'gpu_memory': gpu_memory
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return JsonResponse({
"message": "已开始监控检测到的高资源进程",
"detected_processes": results
})
except Exception as e:
return JsonResponse({"error": str(e)}, status=500)