import os os.environ["TOKENIZERS_PARALLELISM"] = "false" import torch.multiprocessing as mp mp.set_start_method("spawn", force=True) # mp.set_start_method("fork", force=True) from celery import Celery from pynvml import nvmlInit, nvmlDeviceGetCount, nvmlDeviceGetHandleByIndex, nvmlDeviceGetUtilizationRates # from endpoints.video import run_video_inference # from endpoints.video2 import preprocess_video import os import sys sys.path.append(os.path.dirname(os.path.abspath(__file__))) print(os.path.dirname(os.path.abspath(__file__))) # celery_app.conf.task_routes = { # "tasks.*": {"queue": "default"}, # } app = Celery( "tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0", # include=["tasks"] ) # app = Celery( # "tasks", # broker="pyamqp://guest@localhost//", # # backend="rpc://", # backend="redis://localhost:6379/0", # ) app.conf.worker_prefetch_multiplier = 1 # app.conf.task_routes = { # "tasks.*": {"queue": "default"}, # } # app.conf.task_acks_late = True # from celery import Celery # from kombu import Queue # celery = Celery( # "tasks", # broker="redis://localhost:6379/0", # backend="redis://localhost:6379/0", # include=["tasks"] # ) app.conf.task_routes = { 'tasks.preprocess_video': {'queue': 'preprocess_queue'}, 'tasks.inference_video': {'queue': 'inference_queue'}, } # # Define task queues properly # celery.conf.task_queues = ( # Queue("high_priority"), # Queue("default"), # Queue("low_priority"), # ) # # Define task routing # celery.conf.task_routes = { # "tasks.text_query_task": {"queue": "high_priority"}, # "tasks.image_query_task": {"queue": "default"}, # "tasks.video_query_task": {"queue": "low_priority"}, # } # # Define task rate limits # celery.conf.task_annotations = { # "tasks.text_query_task": {"rate_limit": "10/m"}, # "tasks.image_query_task": {"rate_limit": "5/m"}, # "tasks.video_query_task": {"rate_limit": "3/m"}, # } # # Define task retries # celery.conf.task_acks_late = True # Ensure task is only removed from queue when fully processed # celery.conf.worker_prefetch_multiplier = 1 # Avoid one worker taking too many tasks at once # # Define task time limits # celery.conf.task_time_limit = 60 # 60 seconds max execution time # celery.conf.task_soft_time_limit = 50 # Warn at 50 seconds @app.task(name="tasks.text_query_task") def text_query_task(question: str): print("Importing text_query...") from endpoints.text import text_query print(f"Processing question: {question}") return text_query(question) @app.task(name="tasks.image_query_task") def image_query_task(file_path: str, question: str): try: print("Processing in image_query_task...") from endpoints.image import image_query print("file_path in image_query_task...") result = image_query(file_path, question) return result except Exception as e: return {"query": question, "error": str(e)} @app.task(name="tasks.video_query_task") def video_query_task(file_path: str, question: str): """ Celery task to process a video query asynchronously. Reads the video file from disk and processes it. """ try: from endpoints.video import video_query result = video_query(file_path, question) return result except Exception as e: return {"query": question, "error": str(e)} # @celery.task(name="tasks.video_preprocessing_task", priority=5, queue="preprocessing") # def video_preprocessing_task(file_path: str, question: str): # return preprocess_video(file_path, question) # @celery.task(name="tasks.video_query_task", priority=10, queue="inference") # def video_query_task(preprocessed_data): # return run_video_inference(preprocessed_data) import mimetypes from utils.video_processing import split_video_into_segments, extract_motion_key_frames, extract_audio_from_video from utils.audio_transcription import transcribe_audio from pipeline_setup import pipe from utils.image_processing import encode_image_base64 from concurrent.futures import ThreadPoolExecutor, as_completed # def process_segment(segment_data): # segment_path, segment_idx, total_segments = segment_data # print(f"Processing segment {segment_idx+1}/{total_segments}: {segment_path}") # imgs = extract_motion_key_frames(segment_path, max_frames=20, sigma_multiplier=4) # print(f"length of key frames in segments: {len(imgs)}") # print(f"Segment {segment_idx+1}: extract_motion_key_frames finished.") # audio_path = extract_audio_from_video(segment_path) # print(f"Segment {segment_idx+1}: extract_audio_from_video finished.") # transcribed_text = transcribe_audio(audio_path) # print(f"Segment {segment_idx+1}: transcribe_audio finished.") # return { # "segment_path": segment_path, # "key_frames": [encode_image_base64(img) for img in imgs], # "transcribed_text": transcribed_text # } # @app.task(name="tasks.preprocess_video") # def preprocess_video(video_path, question): # try: # # Monitor CPU usage # # cpu_usage = psutil.cpu_percent(interval=1) # # print(f"CPU Usage during preprocessing: {cpu_usage}%") # print(f"Preprocessing video: {video_path}") # if not os.path.exists(video_path): # return {"query": question, "error": "Video file not found."} # # Determine the file type # file_type, _ = mimetypes.guess_type(video_path) # if file_type is None or not file_type.startswith("video/"): # return {"query": question, "error": "Unsupported video file type."} # print("Splitting video...") # segments = split_video_into_segments(video_path, segment_duration=100) # print(f"segments: {segments}") # print(f"Video split into {len(segments)} segments.") # # Process segments in parallel # processed_segments = [] # max_workers = min(len(segments), os.cpu_count() * 2) # print(f"Processing segments with {max_workers} workers...") # with ThreadPoolExecutor(max_workers=max_workers) as executor: # future_to_segment = { # executor.submit(process_segment, (segment_path, idx, len(segments))): idx # for idx, segment_path in enumerate(segments) # } # # Collect results as they complete # segment_results = [None] * len(segments) # for future in as_completed(future_to_segment): # idx = future_to_segment[future] # try: # segment_results[idx] = future.result() # except Exception as e: # print(f"Error processing segment {idx}: {str(e)}") # segment_results[idx] = { # "segment_path": segments[idx], # "error": str(e) # } # print("multithread done!") # processed_segments = [result for result in segment_results if "error" not in result] # return { # "video_path": video_path, # "question": question, # "processed_segments": processed_segments # } # except Exception as e: # return {"query": question, "error": str(e)} def process_video(video_path): print(f"Processing video: {video_path}") # Extract key frames from the entire video imgs = extract_motion_key_frames(video_path, max_frames=20, sigma_multiplier=2) print(f"Number of key frames extracted: {len(imgs)}") print("Key frame extraction finished.") # Extract audio from the video audio_path = extract_audio_from_video(video_path) print("Audio extraction finished.") # Transcribe the extracted audio transcribed_text = transcribe_audio(audio_path) print(transcribed_text) print("Audio transcription finished.") return { "video_path": video_path, "key_frames": [encode_image_base64(img) for img in imgs], "transcribed_text": transcribed_text } @app.task(name="tasks.preprocess_video") def preprocess_video(video_path, question): try: print(f"Preprocessing video: {video_path}") # Check if the video file exists if not os.path.exists(video_path): return {"query": question, "error": "Video file not found."} # Determine the file type file_type, _ = mimetypes.guess_type(video_path) if file_type is None or not file_type.startswith("video/"): return {"query": question, "error": "Unsupported video file type."} # Process the entire video without splitting into segments processed_data = process_video(video_path) return { "video_path": video_path, "question": question, "processed_data": processed_data } except Exception as e: return {"query": question, "error": str(e)} # @app.task(name="tasks.inference_video") # def inference_video(preprocessed_data): # try: # # Monitor GPU usage # # nvmlInit() # # device_count = nvmlDeviceGetCount() # # for i in range(device_count): # # handle = nvmlDeviceGetHandleByIndex(i) # # utilization = nvmlDeviceGetUtilizationRates(handle) # # print(f"GPU {i} Usage during inference: {utilization.gpu}%") # # print(preprocessed_data) # video_path = preprocessed_data["video_path"] # question = preprocessed_data["question"] # # print(f"question: {question}") # segments = preprocessed_data["processed_segments"] # print(f"Running inference on: {video_path}") # aggregated_responses = [] # for i, segment in enumerate(segments): # print(f"Inferencing segment {i+1}/{len(segments)}") # # Prepare input content # question_with_frames = "".join( # [f"Frame{j+1}: {{IMAGE_TOKEN}}\n" for j in range(len(segment["key_frames"]))] # ) # question_with_frames += f"Audio Transcript: {segment['transcribed_text']}\n{question}" # content = [{"type": "text", "text": question_with_frames}] + [ # {"type": "image_url", "image_url": {"max_dynamic_patch": 1, "url": f"data:image/jpeg;base64,{img}"}} # for img in segment["key_frames"] # ] # # Query model # messages = [dict(role="user", content=content)] # response = pipe(messages) # # Aggregate response # aggregated_responses.append(response.text) # return { # "question": question, # "responses": aggregated_responses, # } # except Exception as e: # return {"query": question, "error": str(e)} # @app.task(name="tasks.inference_video") # def inference_video(preprocessed_results): # """ # Processes a batch of preprocessed videos on the GPU. # """ # try: # print("Running inference on a batch of videos...") # aggregated_results = [] # for preprocessed_data in preprocessed_results: # video_path = preprocessed_data["video_path"] # question = preprocessed_data["question"] # segments = preprocessed_data["processed_data"] # print(f"Inferencing video: {video_path}") # # Run inference on the GPU # aggregated_responses = [] # for segment in segments: # # Prepare input for inference # question_with_frames = "".join( # [f"Frame{j+1}: {{IMAGE_TOKEN}}\n" for j in range(len(segment["key_frames"]))] # ) # question_with_frames += f"Audio Transcript: {segment['transcribed_text']}\n{question}" # content = [{"type": "text", "text": question_with_frames}] + [ # {"type": "image_url", "image_url": {"max_dynamic_patch": 1, "url": f"data:image/jpeg;base64,{img}"}} # for img in segment["key_frames"] # ] # # Query model # messages = [dict(role="user", content=content)] # response = pipe(messages) # # Aggregate response # aggregated_responses.append(response.text) # aggregated_results.append({ # "video_path": video_path, # "question": question, # "responses": aggregated_responses # }) # return aggregated_results # except Exception as e: # return {"error": str(e)} @app.task(name="tasks.inference_video") def inference_video(preprocessed_results): """ Processes a batch of preprocessed videos on the GPU. """ try: print("Running inference on a batch of videos...") aggregated_results = [] for preprocessed_data in preprocessed_results: video_path = preprocessed_data["video_path"] question = preprocessed_data["question"] processed_data = preprocessed_data["processed_data"] # print(f"processed_data: {processed_data}") print(f"Inferencing video: {video_path}") # Prepare input for inference question_with_frames = "".join( [f"Frame{j+1}: {{IMAGE_TOKEN}}\n" for j in range(len(processed_data["key_frames"]))] ) question_with_frames += f"Audio Transcript: {processed_data['transcribed_text']}\n{question}" content = [{"type": "text", "text": question_with_frames}] + [ {"type": "image_url", "image_url": {"max_dynamic_patch": 1, "url": f"data:image/jpeg;base64,{img}"}} for img in processed_data["key_frames"] ] # Query model messages = [dict(role="user", content=content)] response = pipe(messages) aggregated_results.append({ "video_path": video_path, "question": question, "response": response.text }) return aggregated_results except Exception as e: return {"error": str(e)}