from fastapi import UploadFile, Form from fastapi.responses import JSONResponse from pipeline_setup import pipe from utils.image_processing import encode_image_base64 from utils.video_processing import split_video_into_segments, extract_motion_key_frames, extract_audio_from_video from utils.audio_transcription import transcribe_audio import time import asyncio import mimetypes from concurrent.futures import ThreadPoolExecutor async def video_query(file: UploadFile, question: str = Form(...)): """ API endpoint to process a video file with the user's query. """ try: print("Processing video...") # Validate file type if file.content_type not in ["video/mp4", "video/avi", "video/mkv"]: return JSONResponse({"query": question, "error": "Unsupported video file type."}) # Start overall timer overall_start_time = time.time() # Save the uploaded video to a temporary file print("Reading video...") video_data = await file.read() temp_video_path = "/tmp/temp_video.mp4" with open(temp_video_path, "wb") as temp_video_file: temp_video_file.write(video_data) print(f"Temp video saved to: {temp_video_path}") # Record the time after reading the video video_reading_time = time.time() # Split the video into segments print("Splitting video...") segments = split_video_into_segments(temp_video_path, segment_duration=30) print(f"Video split into {len(segments)} segments.") aggregated_responses = [] segment_timings = [] for i, segment_path in enumerate(segments): print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") # Start timing for the segment segment_start_time = time.time() # Extract key frames frame_start_time = time.time() imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) frame_time = time.time() # Extract audio and transcribe audio_start_time = time.time() audio_path = extract_audio_from_video(segment_path) transcribed_text = transcribe_audio(audio_path) audio_time = time.time() # Combine transcribed text with the query combined_query = f"Audio Transcript: {transcribed_text}\n{question}" # Prepare content for the pipeline question_with_frames = "" for j, img in enumerate(imgs): question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" question_with_frames += combined_query content = [{"type": "text", "text": question_with_frames}] for img in imgs: content.append({ "type": "image_url", "image_url": { "max_dynamic_patch": 1, "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" } }) # Query the model inference_start_time = time.time() messages = [dict(role="user", content=content)] response = await asyncio.to_thread(pipe, messages) inference_time = time.time() # Aggregate response aggregated_responses.append(response.text) # Calculate timing for the segment segment_timings.append({ "segment_index": i + 1, "segment_processing_time": inference_time - segment_start_time, "frame_extraction_time": frame_time - frame_start_time, "audio_extraction_time": audio_time - audio_start_time, "model_inference_time": inference_time - inference_start_time }) print(f"transcription: {transcribed_text}") # print(f"content: {content}") overall_end_time = time.time() # Aggregate total timings total_timings = { "video_reading_time": video_reading_time - overall_start_time, "total_segments": len(segments), "total_processing_time": overall_end_time - overall_start_time, "segment_details": segment_timings } return JSONResponse({ "question": question, "responses": aggregated_responses, "timings": total_timings, }) except Exception as e: return JSONResponse({"query": question, "error": str(e)}) # async def video_query(video_path: str, question: str): # """ # API endpoint to process a video file with the user's query. # """ # try: # print("Processing video...") # if not video_path or not isinstance(video_path, str): # return {"query": question, "error": "No video file provided or invalid file input."} # # Determine the file type using the file extension # file_type, _ = mimetypes.guess_type(video_path) # if file_type is None or not file_type.startswith("video/"): # return {"query": question, "error": "Unsupported video file type."} # # Log the video path # print(f"Video path: {video_path}") # # Split the video into segments # print("Splitting video...") # segments = split_video_into_segments(video_path, segment_duration=30) # print(f"Video split into {len(segments)} segments.") # aggregated_responses = [] # segment_timings = [] # for i, segment_path in enumerate(segments): # print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") # # Extract key frames # imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) # # Extract audio and transcribe # audio_path = extract_audio_from_video(segment_path) # transcribed_text = transcribe_audio(audio_path) # # Combine transcribed text with the query # combined_query = f"Audio Transcript: {transcribed_text}\n{question}" # # Prepare content for the pipeline # question_with_frames = "" # for j, img in enumerate(imgs): # question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" # question_with_frames += combined_query # content = [{"type": "text", "text": question_with_frames}] # for img in imgs: # content.append({ # "type": "image_url", # "image_url": { # "max_dynamic_patch": 1, # "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" # } # }) # # Query the model # messages = [dict(role="user", content=content)] # response = await asyncio.to_thread(pipe, messages) # # Aggregate response # aggregated_responses.append(response.text) # return { # "question": question, # "responses": aggregated_responses, # } # except Exception as e: # return {"query": question, "error": str(e)}