diff --git a/.gitignore b/.gitignore index f479596..0dab052 100644 --- a/.gitignore +++ b/.gitignore @@ -11,5 +11,6 @@ *.app .snapshots/* - __pycache__/* -endpoints/__pycache__/* \ No newline at end of file + __pycache__/ +endpoints/__pycache__/ +flagged/ \ No newline at end of file diff --git a/endpoints/image.py b/endpoints/image.py index c5a6da3..fa09f51 100644 --- a/endpoints/image.py +++ b/endpoints/image.py @@ -1,5 +1,6 @@ from fastapi import UploadFile, Form from fastapi.responses import JSONResponse +import base64 import io import asyncio import numpy as np @@ -7,43 +8,43 @@ from PIL import Image from pipeline_setup import pipe, IMAGE_TOKEN from utils.image_processing import encode_image_base64 -async def image_query(file: UploadFile, question: str = Form(...)): - """ - API endpoint to process an image with the user's query. - """ - try: - if file.content_type not in ["image/jpeg", "image/png"]: - return JSONResponse({"query": question, "error": "Unsupported file type."}) - - image_data = await file.read() - image = Image.open(io.BytesIO(image_data)).convert("RGB").resize((512, 512)) - encoded_image_base64 = encode_image_base64(image) - - question_with_image_token = f"{question}\n{IMAGE_TOKEN}" - response = await asyncio.to_thread(pipe, (question, image)) - return JSONResponse({"query": question, "response": response.text}) - except Exception as e: - return JSONResponse({"query": question, "error": str(e)}) - - -# async def image_query(image: np.ndarray, question: str): +# async def image_query(file: UploadFile, question: str = Form(...)): # """ -# API endpoint to process an image (as numpy array) with the user's query. +# API endpoint to process an image with the user's query. # """ # try: -# # Convert the numpy array to a PIL Image -# image = Image.fromarray(image).convert("RGB").resize((512, 512)) +# if file.content_type not in ["image/jpeg", "image/png"]: +# return JSONResponse({"query": question, "error": "Unsupported file type."}) + +# image_data = await file.read() +# image = Image.open(io.BytesIO(image_data)).convert("RGB").resize((512, 512)) +# encoded_image_base64 = encode_image_base64(image) -# # Encode the image to base64 (optional, if needed by your pipeline) -# buffered = io.BytesIO() -# image.save(buffered, format="JPEG") -# encoded_image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") - -# # Prepare the query with the image token # question_with_image_token = f"{question}\n{IMAGE_TOKEN}" - -# # Query the model -# response = await asyncio.to_thread(pipe, (question, image)) -# return {"query": question, "response": response.text} +# response = await asyncio.to_thread(pipe, (question, image)) +# return JSONResponse({"query": question, "response": response.text}) # except Exception as e: -# return {"query": question, "error": str(e)} +# return JSONResponse({"query": question, "error": str(e)}) + + +async def image_query(image: np.ndarray, question: str): + """ + API endpoint to process an image (as numpy array) with the user's query. + """ + try: + # Convert the numpy array to a PIL Image + image = Image.fromarray(image).convert("RGB").resize((512, 512)) + + # Encode the image to base64 (optional, if needed by your pipeline) + buffered = io.BytesIO() + image.save(buffered, format="JPEG") + encoded_image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") + + # Prepare the query with the image token + question_with_image_token = f"{question}\n{IMAGE_TOKEN}" + + # Query the model + response = await asyncio.to_thread(pipe, (question, image)) + return {"query": question, "response": response.text} + except Exception as e: + return {"query": question, "error": str(e)} diff --git a/endpoints/text.py b/endpoints/text.py index 5100f03..446b4a8 100644 --- a/endpoints/text.py +++ b/endpoints/text.py @@ -4,23 +4,23 @@ from fastapi.responses import JSONResponse from asyncio import to_thread from pipeline_setup import pipe -async def text_query(question: str = Form(...)): - """ - API endpoint to process text input with the user's query. - """ - try: - response = await to_thread(pipe, question) - return JSONResponse({"query": question, "response": response.text}) - except Exception as e: - return JSONResponse({"query": question, "error": str(e)}) - - # async def text_query(question: str = Form(...)): # """ # API endpoint to process text input with the user's query. # """ # try: -# response = await to_thread(pipe, question) -# return {"query": question, "response": response.text} +# response = await to_thread(pipe, question) +# return JSONResponse({"query": question, "response": response.text}) # except Exception as e: -# return {"query": question, "error": str(e)} +# return JSONResponse({"query": question, "error": str(e)}) + + +async def text_query(question: str = Form(...)): + """ + API endpoint to process text input with the user's query. + """ + try: + response = await to_thread(pipe, question) + return {"query": question, "response": response.text} + except Exception as e: + return {"query": question, "error": str(e)} diff --git a/endpoints/video.py b/endpoints/video.py index 59ae9d8..3cd4ea3 100644 --- a/endpoints/video.py +++ b/endpoints/video.py @@ -9,136 +9,34 @@ import asyncio import mimetypes from concurrent.futures import ThreadPoolExecutor -async def video_query(file: UploadFile, question: str = Form(...)): - """ - API endpoint to process a video file with the user's query. - """ - try: - print("Processing video...") - - # Validate file type - if file.content_type not in ["video/mp4", "video/avi", "video/mkv"]: - return JSONResponse({"query": question, "error": "Unsupported video file type."}) - - # Start overall timer - overall_start_time = time.time() - - # Save the uploaded video to a temporary file - print("Reading video...") - video_data = await file.read() - temp_video_path = "/tmp/temp_video.mp4" - with open(temp_video_path, "wb") as temp_video_file: - temp_video_file.write(video_data) - print(f"Temp video saved to: {temp_video_path}") - - # Record the time after reading the video - video_reading_time = time.time() - - # Split the video into segments - print("Splitting video...") - segments = split_video_into_segments(temp_video_path, segment_duration=30) - print(f"Video split into {len(segments)} segments.") - - aggregated_responses = [] - segment_timings = [] - - for i, segment_path in enumerate(segments): - print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") - - # Start timing for the segment - segment_start_time = time.time() - - # Extract key frames - frame_start_time = time.time() - imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) - frame_time = time.time() - - # Extract audio and transcribe - audio_start_time = time.time() - audio_path = extract_audio_from_video(segment_path) - transcribed_text = transcribe_audio(audio_path) - audio_time = time.time() - - # Combine transcribed text with the query - combined_query = f"Audio Transcript: {transcribed_text}\n{question}" - - # Prepare content for the pipeline - question_with_frames = "" - for j, img in enumerate(imgs): - question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" - question_with_frames += combined_query - - content = [{"type": "text", "text": question_with_frames}] - for img in imgs: - content.append({ - "type": "image_url", - "image_url": { - "max_dynamic_patch": 1, - "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" - } - }) - - # Query the model - inference_start_time = time.time() - messages = [dict(role="user", content=content)] - response = await asyncio.to_thread(pipe, messages) - inference_time = time.time() - - # Aggregate response - aggregated_responses.append(response.text) - - # Calculate timing for the segment - segment_timings.append({ - "segment_index": i + 1, - "segment_processing_time": inference_time - segment_start_time, - "frame_extraction_time": frame_time - frame_start_time, - "audio_extraction_time": audio_time - audio_start_time, - "model_inference_time": inference_time - inference_start_time - }) - - print(f"transcription: {transcribed_text}") - # print(f"content: {content}") - - overall_end_time = time.time() - - # Aggregate total timings - total_timings = { - "video_reading_time": video_reading_time - overall_start_time, - "total_segments": len(segments), - "total_processing_time": overall_end_time - overall_start_time, - "segment_details": segment_timings - } - - return JSONResponse({ - "question": question, - "responses": aggregated_responses, - "timings": total_timings, - }) - except Exception as e: - return JSONResponse({"query": question, "error": str(e)}) - - -# async def video_query(video_path: str, question: str): +# async def video_query(file: UploadFile, question: str = Form(...)): # """ # API endpoint to process a video file with the user's query. # """ # try: # print("Processing video...") -# if not video_path or not isinstance(video_path, str): -# return {"query": question, "error": "No video file provided or invalid file input."} +# # Validate file type +# if file.content_type not in ["video/mp4", "video/avi", "video/mkv"]: +# return JSONResponse({"query": question, "error": "Unsupported video file type."}) -# # Determine the file type using the file extension -# file_type, _ = mimetypes.guess_type(video_path) -# if file_type is None or not file_type.startswith("video/"): -# return {"query": question, "error": "Unsupported video file type."} +# # Start overall timer +# overall_start_time = time.time() -# # Log the video path -# print(f"Video path: {video_path}") +# # Save the uploaded video to a temporary file +# print("Reading video...") +# video_data = await file.read() +# temp_video_path = "/tmp/temp_video.mp4" +# with open(temp_video_path, "wb") as temp_video_file: +# temp_video_file.write(video_data) +# print(f"Temp video saved to: {temp_video_path}") + +# # Record the time after reading the video +# video_reading_time = time.time() # # Split the video into segments # print("Splitting video...") -# segments = split_video_into_segments(video_path, segment_duration=30) +# segments = split_video_into_segments(temp_video_path, segment_duration=30) # print(f"Video split into {len(segments)} segments.") # aggregated_responses = [] @@ -147,12 +45,19 @@ async def video_query(file: UploadFile, question: str = Form(...)): # for i, segment_path in enumerate(segments): # print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") +# # Start timing for the segment +# segment_start_time = time.time() + # # Extract key frames +# frame_start_time = time.time() # imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) +# frame_time = time.time() # # Extract audio and transcribe +# audio_start_time = time.time() # audio_path = extract_audio_from_video(segment_path) # transcribed_text = transcribe_audio(audio_path) +# audio_time = time.time() # # Combine transcribed text with the query # combined_query = f"Audio Transcript: {transcribed_text}\n{question}" @@ -174,15 +79,110 @@ async def video_query(file: UploadFile, question: str = Form(...)): # }) # # Query the model +# inference_start_time = time.time() # messages = [dict(role="user", content=content)] # response = await asyncio.to_thread(pipe, messages) +# inference_time = time.time() # # Aggregate response # aggregated_responses.append(response.text) -# return { +# # Calculate timing for the segment +# segment_timings.append({ +# "segment_index": i + 1, +# "segment_processing_time": inference_time - segment_start_time, +# "frame_extraction_time": frame_time - frame_start_time, +# "audio_extraction_time": audio_time - audio_start_time, +# "model_inference_time": inference_time - inference_start_time +# }) + +# print(f"transcription: {transcribed_text}") +# # print(f"content: {content}") + +# overall_end_time = time.time() + +# # Aggregate total timings +# total_timings = { +# "video_reading_time": video_reading_time - overall_start_time, +# "total_segments": len(segments), +# "total_processing_time": overall_end_time - overall_start_time, +# "segment_details": segment_timings +# } + +# return JSONResponse({ # "question": question, # "responses": aggregated_responses, -# } +# "timings": total_timings, +# }) # except Exception as e: -# return {"query": question, "error": str(e)} \ No newline at end of file +# return JSONResponse({"query": question, "error": str(e)}) + + +async def video_query(video_path: str, question: str): + """ + API endpoint to process a video file with the user's query. + """ + try: + print("Processing video...") + + if not video_path or not isinstance(video_path, str): + return {"query": question, "error": "No video file provided or invalid file input."} + + # Determine the file type using the file extension + file_type, _ = mimetypes.guess_type(video_path) + if file_type is None or not file_type.startswith("video/"): + return {"query": question, "error": "Unsupported video file type."} + + # Log the video path + print(f"Video path: {video_path}") + + # Split the video into segments + print("Splitting video...") + segments = split_video_into_segments(video_path, segment_duration=30) + print(f"Video split into {len(segments)} segments.") + + aggregated_responses = [] + segment_timings = [] + + for i, segment_path in enumerate(segments): + print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") + + # Extract key frames + imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) + + # Extract audio and transcribe + audio_path = extract_audio_from_video(segment_path) + transcribed_text = transcribe_audio(audio_path) + + # Combine transcribed text with the query + combined_query = f"Audio Transcript: {transcribed_text}\n{question}" + + # Prepare content for the pipeline + question_with_frames = "" + for j, img in enumerate(imgs): + question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" + question_with_frames += combined_query + + content = [{"type": "text", "text": question_with_frames}] + for img in imgs: + content.append({ + "type": "image_url", + "image_url": { + "max_dynamic_patch": 1, + "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" + } + }) + + # Query the model + messages = [dict(role="user", content=content)] + response = await asyncio.to_thread(pipe, messages) + + # Aggregate response + aggregated_responses.append(response.text) + + return { + "question": question, + "responses": aggregated_responses, + } + except Exception as e: + return {"query": question, "error": str(e)} \ No newline at end of file