Tiktok-Talent-Info/tasks.py
2025-05-12 11:22:46 +08:00

404 lines
14 KiB
Python

import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import torch.multiprocessing as mp
mp.set_start_method("spawn", force=True)
# mp.set_start_method("fork", force=True)
from celery import Celery
from pynvml import nvmlInit, nvmlDeviceGetCount, nvmlDeviceGetHandleByIndex, nvmlDeviceGetUtilizationRates
# from endpoints.video import run_video_inference
# from endpoints.video2 import preprocess_video
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
print(os.path.dirname(os.path.abspath(__file__)))
# celery_app.conf.task_routes = {
# "tasks.*": {"queue": "default"},
# }
app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
# include=["tasks"]
)
# app = Celery(
# "tasks",
# broker="pyamqp://guest@localhost//",
# # backend="rpc://",
# backend="redis://localhost:6379/0",
# )
app.conf.worker_prefetch_multiplier = 1
# app.conf.task_routes = {
# "tasks.*": {"queue": "default"},
# }
# app.conf.task_acks_late = True
# from celery import Celery
# from kombu import Queue
# celery = Celery(
# "tasks",
# broker="redis://localhost:6379/0",
# backend="redis://localhost:6379/0",
# include=["tasks"]
# )
app.conf.task_routes = {
'tasks.preprocess_video': {'queue': 'preprocess_queue'},
'tasks.inference_video': {'queue': 'inference_queue'},
}
# # Define task queues properly
# celery.conf.task_queues = (
# Queue("high_priority"),
# Queue("default"),
# Queue("low_priority"),
# )
# # Define task routing
# celery.conf.task_routes = {
# "tasks.text_query_task": {"queue": "high_priority"},
# "tasks.image_query_task": {"queue": "default"},
# "tasks.video_query_task": {"queue": "low_priority"},
# }
# # Define task rate limits
# celery.conf.task_annotations = {
# "tasks.text_query_task": {"rate_limit": "10/m"},
# "tasks.image_query_task": {"rate_limit": "5/m"},
# "tasks.video_query_task": {"rate_limit": "3/m"},
# }
# # Define task retries
# celery.conf.task_acks_late = True # Ensure task is only removed from queue when fully processed
# celery.conf.worker_prefetch_multiplier = 1 # Avoid one worker taking too many tasks at once
# # Define task time limits
# celery.conf.task_time_limit = 60 # 60 seconds max execution time
# celery.conf.task_soft_time_limit = 50 # Warn at 50 seconds
@app.task(name="tasks.text_query_task")
def text_query_task(question: str):
print("Importing text_query...")
from endpoints.text import text_query
print(f"Processing question: {question}")
return text_query(question)
@app.task(name="tasks.image_query_task")
def image_query_task(file_path: str, question: str):
try:
print("Processing in image_query_task...")
from endpoints.image import image_query
print("file_path in image_query_task...")
result = image_query(file_path, question)
return result
except Exception as e:
return {"query": question, "error": str(e)}
@app.task(name="tasks.video_query_task")
def video_query_task(file_path: str, question: str):
"""
Celery task to process a video query asynchronously.
Reads the video file from disk and processes it.
"""
try:
from endpoints.video import video_query
result = video_query(file_path, question)
return result
except Exception as e:
return {"query": question, "error": str(e)}
# @celery.task(name="tasks.video_preprocessing_task", priority=5, queue="preprocessing")
# def video_preprocessing_task(file_path: str, question: str):
# return preprocess_video(file_path, question)
# @celery.task(name="tasks.video_query_task", priority=10, queue="inference")
# def video_query_task(preprocessed_data):
# return run_video_inference(preprocessed_data)
import mimetypes
from utils.video_processing import split_video_into_segments, extract_motion_key_frames, extract_audio_from_video
from utils.audio_transcription import transcribe_audio
from pipeline_setup import pipe
from utils.image_processing import encode_image_base64
from concurrent.futures import ThreadPoolExecutor, as_completed
# def process_segment(segment_data):
# segment_path, segment_idx, total_segments = segment_data
# print(f"Processing segment {segment_idx+1}/{total_segments}: {segment_path}")
# imgs = extract_motion_key_frames(segment_path, max_frames=20, sigma_multiplier=4)
# print(f"length of key frames in segments: {len(imgs)}")
# print(f"Segment {segment_idx+1}: extract_motion_key_frames finished.")
# audio_path = extract_audio_from_video(segment_path)
# print(f"Segment {segment_idx+1}: extract_audio_from_video finished.")
# transcribed_text = transcribe_audio(audio_path)
# print(f"Segment {segment_idx+1}: transcribe_audio finished.")
# return {
# "segment_path": segment_path,
# "key_frames": [encode_image_base64(img) for img in imgs],
# "transcribed_text": transcribed_text
# }
# @app.task(name="tasks.preprocess_video")
# def preprocess_video(video_path, question):
# try:
# # Monitor CPU usage
# # cpu_usage = psutil.cpu_percent(interval=1)
# # print(f"CPU Usage during preprocessing: {cpu_usage}%")
# print(f"Preprocessing video: {video_path}")
# if not os.path.exists(video_path):
# return {"query": question, "error": "Video file not found."}
# # Determine the file type
# file_type, _ = mimetypes.guess_type(video_path)
# if file_type is None or not file_type.startswith("video/"):
# return {"query": question, "error": "Unsupported video file type."}
# print("Splitting video...")
# segments = split_video_into_segments(video_path, segment_duration=100)
# print(f"segments: {segments}")
# print(f"Video split into {len(segments)} segments.")
# # Process segments in parallel
# processed_segments = []
# max_workers = min(len(segments), os.cpu_count() * 2)
# print(f"Processing segments with {max_workers} workers...")
# with ThreadPoolExecutor(max_workers=max_workers) as executor:
# future_to_segment = {
# executor.submit(process_segment, (segment_path, idx, len(segments))): idx
# for idx, segment_path in enumerate(segments)
# }
# # Collect results as they complete
# segment_results = [None] * len(segments)
# for future in as_completed(future_to_segment):
# idx = future_to_segment[future]
# try:
# segment_results[idx] = future.result()
# except Exception as e:
# print(f"Error processing segment {idx}: {str(e)}")
# segment_results[idx] = {
# "segment_path": segments[idx],
# "error": str(e)
# }
# print("multithread done!")
# processed_segments = [result for result in segment_results if "error" not in result]
# return {
# "video_path": video_path,
# "question": question,
# "processed_segments": processed_segments
# }
# except Exception as e:
# return {"query": question, "error": str(e)}
def process_video(video_path):
print(f"Processing video: {video_path}")
# Extract key frames from the entire video
imgs = extract_motion_key_frames(video_path, max_frames=20, sigma_multiplier=2)
print(f"Number of key frames extracted: {len(imgs)}")
print("Key frame extraction finished.")
# Extract audio from the video
audio_path = extract_audio_from_video(video_path)
print("Audio extraction finished.")
# Transcribe the extracted audio
transcribed_text = transcribe_audio(audio_path)
print(transcribed_text)
print("Audio transcription finished.")
return {
"video_path": video_path,
"key_frames": [encode_image_base64(img) for img in imgs],
"transcribed_text": transcribed_text
}
@app.task(name="tasks.preprocess_video")
def preprocess_video(video_path, question):
try:
print(f"Preprocessing video: {video_path}")
# Check if the video file exists
if not os.path.exists(video_path):
return {"query": question, "error": "Video file not found."}
# Determine the file type
file_type, _ = mimetypes.guess_type(video_path)
if file_type is None or not file_type.startswith("video/"):
return {"query": question, "error": "Unsupported video file type."}
# Process the entire video without splitting into segments
processed_data = process_video(video_path)
return {
"video_path": video_path,
"question": question,
"processed_data": processed_data
}
except Exception as e:
return {"query": question, "error": str(e)}
# @app.task(name="tasks.inference_video")
# def inference_video(preprocessed_data):
# try:
# # Monitor GPU usage
# # nvmlInit()
# # device_count = nvmlDeviceGetCount()
# # for i in range(device_count):
# # handle = nvmlDeviceGetHandleByIndex(i)
# # utilization = nvmlDeviceGetUtilizationRates(handle)
# # print(f"GPU {i} Usage during inference: {utilization.gpu}%")
# # print(preprocessed_data)
# video_path = preprocessed_data["video_path"]
# question = preprocessed_data["question"]
# # print(f"question: {question}")
# segments = preprocessed_data["processed_segments"]
# print(f"Running inference on: {video_path}")
# aggregated_responses = []
# for i, segment in enumerate(segments):
# print(f"Inferencing segment {i+1}/{len(segments)}")
# # Prepare input content
# question_with_frames = "".join(
# [f"Frame{j+1}: {{IMAGE_TOKEN}}\n" for j in range(len(segment["key_frames"]))]
# )
# question_with_frames += f"Audio Transcript: {segment['transcribed_text']}\n{question}"
# content = [{"type": "text", "text": question_with_frames}] + [
# {"type": "image_url", "image_url": {"max_dynamic_patch": 1, "url": f"data:image/jpeg;base64,{img}"}}
# for img in segment["key_frames"]
# ]
# # Query model
# messages = [dict(role="user", content=content)]
# response = pipe(messages)
# # Aggregate response
# aggregated_responses.append(response.text)
# return {
# "question": question,
# "responses": aggregated_responses,
# }
# except Exception as e:
# return {"query": question, "error": str(e)}
# @app.task(name="tasks.inference_video")
# def inference_video(preprocessed_results):
# """
# Processes a batch of preprocessed videos on the GPU.
# """
# try:
# print("Running inference on a batch of videos...")
# aggregated_results = []
# for preprocessed_data in preprocessed_results:
# video_path = preprocessed_data["video_path"]
# question = preprocessed_data["question"]
# segments = preprocessed_data["processed_data"]
# print(f"Inferencing video: {video_path}")
# # Run inference on the GPU
# aggregated_responses = []
# for segment in segments:
# # Prepare input for inference
# question_with_frames = "".join(
# [f"Frame{j+1}: {{IMAGE_TOKEN}}\n" for j in range(len(segment["key_frames"]))]
# )
# question_with_frames += f"Audio Transcript: {segment['transcribed_text']}\n{question}"
# content = [{"type": "text", "text": question_with_frames}] + [
# {"type": "image_url", "image_url": {"max_dynamic_patch": 1, "url": f"data:image/jpeg;base64,{img}"}}
# for img in segment["key_frames"]
# ]
# # Query model
# messages = [dict(role="user", content=content)]
# response = pipe(messages)
# # Aggregate response
# aggregated_responses.append(response.text)
# aggregated_results.append({
# "video_path": video_path,
# "question": question,
# "responses": aggregated_responses
# })
# return aggregated_results
# except Exception as e:
# return {"error": str(e)}
@app.task(name="tasks.inference_video")
def inference_video(preprocessed_results):
"""
Processes a batch of preprocessed videos on the GPU.
"""
try:
print("Running inference on a batch of videos...")
aggregated_results = []
for preprocessed_data in preprocessed_results:
video_path = preprocessed_data["video_path"]
question = preprocessed_data["question"]
processed_data = preprocessed_data["processed_data"]
# print(f"processed_data: {processed_data}")
print(f"Inferencing video: {video_path}")
# Prepare input for inference
question_with_frames = "".join(
[f"Frame{j+1}: {{IMAGE_TOKEN}}\n" for j in range(len(processed_data["key_frames"]))]
)
question_with_frames += f"Audio Transcript: {processed_data['transcribed_text']}\n{question}"
content = [{"type": "text", "text": question_with_frames}] + [
{"type": "image_url", "image_url": {"max_dynamic_patch": 1, "url": f"data:image/jpeg;base64,{img}"}}
for img in processed_data["key_frames"]
]
# Query model
messages = [dict(role="user", content=content)]
response = pipe(messages)
aggregated_results.append({
"video_path": video_path,
"question": question,
"response": response.text
})
return aggregated_results
except Exception as e:
return {"error": str(e)}