from fastapi import UploadFile, Form from fastapi.responses import JSONResponse from pipeline_setup import pipe from utils.image_processing import encode_image_base64 from utils.video_processing import split_video_into_segments, extract_motion_key_frames, extract_audio_from_video from utils.audio_transcription import transcribe_audio import asyncio import mimetypes from concurrent.futures import ThreadPoolExecutor # async def video_query(file: UploadFile, question: str = Form(...)): # """ # API endpoint to process a video file with the user's query. # """ # try: # print("Processing video...") # # Validate file type # if file.content_type not in ["video/mp4", "video/avi", "video/mkv"]: # return JSONResponse({"query": question, "error": "Unsupported video file type."}) # # Start overall timer # overall_start_time = time.time() # # Save the uploaded video to a temporary file # print("Reading video...") # video_data = await file.read() # temp_video_path = "/tmp/temp_video.mp4" # with open(temp_video_path, "wb") as temp_video_file: # temp_video_file.write(video_data) # print(f"Temp video saved to: {temp_video_path}") # # Record the time after reading the video # video_reading_time = time.time() # # Split the video into segments # print("Splitting video...") # segments = split_video_into_segments(temp_video_path, segment_duration=30) # print(f"Video split into {len(segments)} segments.") # aggregated_responses = [] # segment_timings = [] # for i, segment_path in enumerate(segments): # print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") # # Start timing for the segment # segment_start_time = time.time() # # Extract key frames # frame_start_time = time.time() # imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) # frame_time = time.time() # # Extract audio and transcribe # audio_start_time = time.time() # audio_path = extract_audio_from_video(segment_path) # transcribed_text = transcribe_audio(audio_path) # audio_time = time.time() # # Combine transcribed text with the query # combined_query = f"Audio Transcript: {transcribed_text}\n{question}" # # Prepare content for the pipeline # question_with_frames = "" # for j, img in enumerate(imgs): # question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" # question_with_frames += combined_query # content = [{"type": "text", "text": question_with_frames}] # for img in imgs: # content.append({ # "type": "image_url", # "image_url": { # "max_dynamic_patch": 1, # "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" # } # }) # # Query the model # inference_start_time = time.time() # messages = [dict(role="user", content=content)] # response = await asyncio.to_thread(pipe, messages) # inference_time = time.time() # # Aggregate response # aggregated_responses.append(response.text) # # Calculate timing for the segment # segment_timings.append({ # "segment_index": i + 1, # "segment_processing_time": inference_time - segment_start_time, # "frame_extraction_time": frame_time - frame_start_time, # "audio_extraction_time": audio_time - audio_start_time, # "model_inference_time": inference_time - inference_start_time # }) # print(f"transcription: {transcribed_text}") # # print(f"content: {content}") # overall_end_time = time.time() # # Aggregate total timings # total_timings = { # "video_reading_time": video_reading_time - overall_start_time, # "total_segments": len(segments), # "total_processing_time": overall_end_time - overall_start_time, # "segment_details": segment_timings # } # return JSONResponse({ # "question": question, # "responses": aggregated_responses, # "timings": total_timings, # }) # except Exception as e: # return JSONResponse({"query": question, "error": str(e)}) async def video_query(video_path: str, question: str): """ API endpoint to process a video file with the user's query. """ try: print("Processing video...") if not video_path or not isinstance(video_path, str): return {"query": question, "error": "No video file provided or invalid file input."} # Determine the file type using the file extension file_type, _ = mimetypes.guess_type(video_path) if file_type is None or not file_type.startswith("video/"): return {"query": question, "error": "Unsupported video file type."} # Log the video path print(f"Video path: {video_path}") # Split the video into segments print("Splitting video...") segments = split_video_into_segments(video_path, segment_duration=30) print(f"Video split into {len(segments)} segments.") aggregated_responses = [] segment_timings = [] for i, segment_path in enumerate(segments): print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") # Extract key frames imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) # Extract audio and transcribe audio_path = extract_audio_from_video(segment_path) transcribed_text = transcribe_audio(audio_path) # Combine transcribed text with the query combined_query = f"Audio Transcript: {transcribed_text}\n{question}" # Prepare content for the pipeline question_with_frames = "" for j, img in enumerate(imgs): question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" question_with_frames += combined_query content = [{"type": "text", "text": question_with_frames}] for img in imgs: content.append({ "type": "image_url", "image_url": { "max_dynamic_patch": 1, "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" } }) # Query the model messages = [dict(role="user", content=content)] response = await asyncio.to_thread(pipe, messages) # Aggregate response aggregated_responses.append(response.text) return { "question": question, "responses": aggregated_responses, } except Exception as e: return {"query": question, "error": str(e)}