diff --git a/__pycache__/pipeline_setup.cpython-312.pyc b/__pycache__/pipeline_setup.cpython-312.pyc index 6ed9844..e64fb63 100644 Binary files a/__pycache__/pipeline_setup.cpython-312.pyc and b/__pycache__/pipeline_setup.cpython-312.pyc differ diff --git a/endpoints/image.py b/endpoints/image.py index f75fad4..1ec76a8 100644 --- a/endpoints/image.py +++ b/endpoints/image.py @@ -1,54 +1,51 @@ from fastapi import UploadFile, Form from fastapi.responses import JSONResponse import io +import base64 import asyncio +import numpy as np from PIL import Image from pipeline_setup import pipe, IMAGE_TOKEN from utils.image_processing import encode_image_base64 -async def image_query(file: UploadFile, question: str = Form(...)): - """ - API endpoint to process an image with the user's query. - """ - try: - if file.content_type not in ["image/jpeg", "image/png"]: - return JSONResponse({"query": question, "error": "Unsupported file type."}) - - image_data = await file.read() - image = Image.open(io.BytesIO(image_data)).convert("RGB").resize((512, 512)) - encoded_image_base64 = encode_image_base64(image) - - question_with_image_token = f"{question}\n{IMAGE_TOKEN}" - response = await asyncio.to_thread(pipe, (question, image)) - return JSONResponse({"query": question, "response": response.text}) - except Exception as e: - return JSONResponse({"query": question, "error": str(e)}) - - -# import mimetypes # async def image_query(file: UploadFile, question: str = Form(...)): # """ # API endpoint to process an image with the user's query. # """ # try: -# # Get the file path from the UploadFile object -# file_path = file.filename +# if file.content_type not in ["image/jpeg", "image/png"]: +# return JSONResponse({"query": question, "error": "Unsupported file type."}) -# # Determine the file type using the file extension -# file_type, _ = mimetypes.guess_type(file_path) -# if file_type not in ["image/jpeg", "image/png"]: -# return {"query": question, "error": "Unsupported file type."} - -# # Read the image file # image_data = await file.read() # image = Image.open(io.BytesIO(image_data)).convert("RGB").resize((512, 512)) # encoded_image_base64 = encode_image_base64(image) - -# # Prepare the query with the image token + # question_with_image_token = f"{question}\n{IMAGE_TOKEN}" - -# # Query the model # response = await asyncio.to_thread(pipe, (question, image)) -# return {"query": question, "response": response.text} +# return JSONResponse({"query": question, "response": response.text}) # except Exception as e: -# return {"query": question, "error": str(e)} +# return JSONResponse({"query": question, "error": str(e)}) + + +# import mimetypes +async def image_query(image: np.ndarray, question: str): + """ + API endpoint to process an image (as numpy array) with the user's query. + """ + try: + # Convert the numpy array to a PIL Image + image = Image.fromarray(image).convert("RGB").resize((512, 512)) + + # Encode the image to base64 (optional, if needed by your pipeline) + buffered = io.BytesIO() + image.save(buffered, format="JPEG") + encoded_image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") + + # Prepare the query with the image token + question_with_image_token = f"{question}\n{IMAGE_TOKEN}" + + # Query the model + response = await asyncio.to_thread(pipe, (question, image)) + return {"query": question, "response": response.text} + except Exception as e: + return {"query": question, "error": str(e)} diff --git a/endpoints/text.py b/endpoints/text.py index f1afc03..064e01b 100644 --- a/endpoints/text.py +++ b/endpoints/text.py @@ -3,22 +3,22 @@ from fastapi.responses import JSONResponse from asyncio import to_thread from pipeline_setup import pipe -async def text_query(question: str = Form(...)): - """ - API endpoint to process text input with the user's query. - """ - try: - response = await to_thread(pipe, question) - return JSONResponse({"query": question, "response": response.text}) - except Exception as e: - return JSONResponse({"query": question, "error": str(e)}) - # async def text_query(question: str = Form(...)): # """ # API endpoint to process text input with the user's query. # """ # try: # response = await to_thread(pipe, question) -# return {"query": question, "response": response.text} +# return JSONResponse({"query": question, "response": response.text}) # except Exception as e: -# return {"query": question, "error": str(e)} +# return JSONResponse({"query": question, "error": str(e)}) + +async def text_query(question: str = Form(...)): + """ + API endpoint to process text input with the user's query. + """ + try: + response = await to_thread(pipe, question) + return {"query": question, "response": response.text} + except Exception as e: + return {"query": question, "error": str(e)} diff --git a/endpoints/video.py b/endpoints/video.py index 62fbc4a..6d8b5db 100644 --- a/endpoints/video.py +++ b/endpoints/video.py @@ -5,118 +5,9 @@ from utils.image_processing import encode_image_base64 from utils.video_processing import split_video_into_segments, extract_motion_key_frames, extract_audio_from_video from utils.audio_transcription import transcribe_audio import asyncio -import time +import mimetypes from concurrent.futures import ThreadPoolExecutor -async def video_query(file: UploadFile, question: str = Form(...)): - """ - API endpoint to process a video file with the user's query. - """ - try: - print("Processing video...") - - # Validate file type - if file.content_type not in ["video/mp4", "video/avi", "video/mkv"]: - return JSONResponse({"query": question, "error": "Unsupported video file type."}) - - # Start overall timer - overall_start_time = time.time() - - # Save the uploaded video to a temporary file - print("Reading video...") - video_data = await file.read() - temp_video_path = "/tmp/temp_video.mp4" - with open(temp_video_path, "wb") as temp_video_file: - temp_video_file.write(video_data) - print(f"Temp video saved to: {temp_video_path}") - - # Record the time after reading the video - video_reading_time = time.time() - - # Split the video into segments - print("Splitting video...") - segments = split_video_into_segments(temp_video_path, segment_duration=30) - print(f"Video split into {len(segments)} segments.") - - aggregated_responses = [] - segment_timings = [] - - for i, segment_path in enumerate(segments): - print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") - - # Start timing for the segment - segment_start_time = time.time() - - # Extract key frames - frame_start_time = time.time() - imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) - frame_time = time.time() - - # Extract audio and transcribe - audio_start_time = time.time() - audio_path = extract_audio_from_video(segment_path) - transcribed_text = transcribe_audio(audio_path) - audio_time = time.time() - - # Combine transcribed text with the query - combined_query = f"Audio Transcript: {transcribed_text}\n{question}" - - # Prepare content for the pipeline - question_with_frames = "" - for j, img in enumerate(imgs): - question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" - question_with_frames += combined_query - - content = [{"type": "text", "text": question_with_frames}] - for img in imgs: - content.append({ - "type": "image_url", - "image_url": { - "max_dynamic_patch": 1, - "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" - } - }) - - # Query the model - inference_start_time = time.time() - messages = [dict(role="user", content=content)] - response = await asyncio.to_thread(pipe, messages) - inference_time = time.time() - - # Aggregate response - aggregated_responses.append(response.text) - - # Calculate timing for the segment - segment_timings.append({ - "segment_index": i + 1, - "segment_processing_time": inference_time - segment_start_time, - "frame_extraction_time": frame_time - frame_start_time, - "audio_extraction_time": audio_time - audio_start_time, - "model_inference_time": inference_time - inference_start_time - }) - - print(f"transcription: {transcribed_text}") - # print(f"content: {content}") - - overall_end_time = time.time() - - # Aggregate total timings - total_timings = { - "video_reading_time": video_reading_time - overall_start_time, - "total_segments": len(segments), - "total_processing_time": overall_end_time - overall_start_time, - "segment_details": segment_timings - } - - return JSONResponse({ - "question": question, - "responses": aggregated_responses, - "timings": total_timings, - }) - except Exception as e: - return JSONResponse({"query": question, "error": str(e)}) - - # async def video_query(file: UploadFile, question: str = Form(...)): # """ # API endpoint to process a video file with the user's query. @@ -124,13 +15,9 @@ async def video_query(file: UploadFile, question: str = Form(...)): # try: # print("Processing video...") -# # Get the file path from the UploadFile object -# file_path = file.filename - -# # Determine the file type using the file extension -# file_type, _ = mimetypes.guess_type(file_path) -# if file_type is None or not file_type.startswith("video/"): -# return {"query": question, "error": "Unsupported video file type."} +# # Validate file type +# if file.content_type not in ["video/mp4", "video/avi", "video/mkv"]: +# return JSONResponse({"query": question, "error": "Unsupported video file type."}) # # Start overall timer # overall_start_time = time.time() @@ -209,6 +96,7 @@ async def video_query(file: UploadFile, question: str = Form(...)): # }) # print(f"transcription: {transcribed_text}") +# # print(f"content: {content}") # overall_end_time = time.time() @@ -220,10 +108,80 @@ async def video_query(file: UploadFile, question: str = Form(...)): # "segment_details": segment_timings # } -# return { +# return JSONResponse({ # "question": question, # "responses": aggregated_responses, # "timings": total_timings, -# } +# }) # except Exception as e: -# return {"query": question, "error": str(e)} \ No newline at end of file +# return JSONResponse({"query": question, "error": str(e)}) + + +async def video_query(video_path: str, question: str): + """ + API endpoint to process a video file with the user's query. + """ + try: + print("Processing video...") + + if not video_path or not isinstance(video_path, str): + return {"query": question, "error": "No video file provided or invalid file input."} + + # Determine the file type using the file extension + file_type, _ = mimetypes.guess_type(video_path) + if file_type is None or not file_type.startswith("video/"): + return {"query": question, "error": "Unsupported video file type."} + + # Log the video path + print(f"Video path: {video_path}") + + # Split the video into segments + print("Splitting video...") + segments = split_video_into_segments(video_path, segment_duration=30) + print(f"Video split into {len(segments)} segments.") + + aggregated_responses = [] + segment_timings = [] + + for i, segment_path in enumerate(segments): + print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") + + # Extract key frames + imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) + + # Extract audio and transcribe + audio_path = extract_audio_from_video(segment_path) + transcribed_text = transcribe_audio(audio_path) + + # Combine transcribed text with the query + combined_query = f"Audio Transcript: {transcribed_text}\n{question}" + + # Prepare content for the pipeline + question_with_frames = "" + for j, img in enumerate(imgs): + question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" + question_with_frames += combined_query + + content = [{"type": "text", "text": question_with_frames}] + for img in imgs: + content.append({ + "type": "image_url", + "image_url": { + "max_dynamic_patch": 1, + "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" + } + }) + + # Query the model + messages = [dict(role="user", content=content)] + response = await asyncio.to_thread(pipe, messages) + + # Aggregate response + aggregated_responses.append(response.text) + + return { + "question": question, + "responses": aggregated_responses, + } + except Exception as e: + return {"query": question, "error": str(e)} diff --git a/flagged/log.csv b/flagged/log.csv new file mode 100644 index 0000000..a52b090 --- /dev/null +++ b/flagged/log.csv @@ -0,0 +1,2 @@ +prompts,Response,flag,username,timestamp +"{""image"": ""flagged/prompts/fdd45d065153a29e7e3d/1.2.png"", ""points"": []}",,,,2025-01-24 11:09:07.710989 diff --git a/flagged/prompts/fdd45d065153a29e7e3d/1.2.png b/flagged/prompts/fdd45d065153a29e7e3d/1.2.png new file mode 100644 index 0000000..c1decc9 Binary files /dev/null and b/flagged/prompts/fdd45d065153a29e7e3d/1.2.png differ diff --git a/pipeline_setup.py b/pipeline_setup.py index 46f4faf..4641409 100644 --- a/pipeline_setup.py +++ b/pipeline_setup.py @@ -9,8 +9,9 @@ pipe = pipeline( model, backend_config=TurbomindEngineConfig( model_format="awq", - tp=2, - device_ids=[0, 1], + # tp=2, + tp=4, + # device_ids=[0, 1], session_len=12864, max_batch_size=1, cache_max_entry_count=0.05, diff --git a/ui.py b/ui.py index 3263e18..edb62a2 100644 --- a/ui.py +++ b/ui.py @@ -1,9 +1,17 @@ -import gradio as gr +import os import asyncio +import gradio as gr +from gradio_image_prompter import ImagePrompter from endpoints.text import text_query from endpoints.image import image_query from endpoints.video import video_query +os.environ["CUDA_VISIBLE_DEVICES"] = "0,1" + +import torch +print("Available GPUs:", torch.cuda.device_count()) +print("Visible Devices:", [torch.cuda.get_device_name(i) for i in range(torch.cuda.device_count())]) + def setup_ui(): with gr.Blocks() as ui: gr.Markdown( @@ -32,13 +40,39 @@ def setup_ui(): with gr.Tab("Image Query"): gr.Markdown("### Submit an Image Query") with gr.Row(): - image_input = gr.File(label="Upload Image") + image_prompter = ImagePrompter(show_label=False) image_question_input = gr.Textbox(label="Your Question", placeholder="Type your question here...") image_button = gr.Button("Submit") image_output = gr.Textbox(label="Response", interactive=False) + + # async def handle_image_query(prompts, question): + # response = await image_query(prompts["image"], question) + # return response["response"] if "response" in response else response["error"] + + async def handle_image_query(prompts, question): + """ + Handles the image query and ensures that inputs are valid. + """ + try: + # Validate prompts + if prompts is None or "image" not in prompts: + return "No image provided. Please upload an image." + + image_data = prompts["image"] + + # Check if image_data is valid + if image_data is None: + return "Invalid image input. Please upload a valid image." + + # Call the `image_query` function + response = await image_query(image_data, question) + return response["response"] if "response" in response else response["error"] + except Exception as e: + return str(e) + image_button.click( - fn=lambda img, q: asyncio.run(image_query(img, q)), - inputs=[image_input, image_question_input], + fn=handle_image_query, + inputs=[image_prompter, image_question_input], outputs=[image_output] ) @@ -46,18 +80,24 @@ def setup_ui(): with gr.Tab("Video Query"): gr.Markdown("### Submit a Video Query") with gr.Row(): - video_input = gr.File(label="Upload Video") + video_input = gr.Video(label="Upload Video") video_question_input = gr.Textbox(label="Your Question", placeholder="Type your question here...") video_button = gr.Button("Submit") video_output = gr.Textbox(label="Response", interactive=False) + + async def handle_video_query(video, question): + response = await video_query(video, question) + return response.get("responses", response.get("error", "Error processing video.")) + video_button.click( - fn=lambda vid, q: asyncio.run(video_query(vid, q)), + fn=handle_video_query, inputs=[video_input, video_question_input], outputs=[video_output] ) + return ui if __name__ == "__main__": ui = setup_ui() - ui.launch(server_name="0.0.0.0", server_port=7860) \ No newline at end of file + ui.launch(server_name="0.0.0.0", server_port=8002)