diff --git a/__pycache__/locustfile.cpython-312.pyc b/__pycache__/locustfile.cpython-312.pyc new file mode 100644 index 0000000..151daad Binary files /dev/null and b/__pycache__/locustfile.cpython-312.pyc differ diff --git a/__pycache__/pipeline_setup.cpython-312.pyc b/__pycache__/pipeline_setup.cpython-312.pyc index e64fb63..e0a5486 100644 Binary files a/__pycache__/pipeline_setup.cpython-312.pyc and b/__pycache__/pipeline_setup.cpython-312.pyc differ diff --git a/endpoints/image.py b/endpoints/image.py index 1ec76a8..466a467 100644 --- a/endpoints/image.py +++ b/endpoints/image.py @@ -8,44 +8,44 @@ from PIL import Image from pipeline_setup import pipe, IMAGE_TOKEN from utils.image_processing import encode_image_base64 -# async def image_query(file: UploadFile, question: str = Form(...)): -# """ -# API endpoint to process an image with the user's query. -# """ -# try: -# if file.content_type not in ["image/jpeg", "image/png"]: -# return JSONResponse({"query": question, "error": "Unsupported file type."}) +async def image_query(file: UploadFile, question: str = Form(...)): + """ + API endpoint to process an image with the user's query. + """ + try: + if file.content_type not in ["image/jpeg", "image/png"]: + return JSONResponse({"query": question, "error": "Unsupported file type."}) -# image_data = await file.read() -# image = Image.open(io.BytesIO(image_data)).convert("RGB").resize((512, 512)) -# encoded_image_base64 = encode_image_base64(image) + image_data = await file.read() + image = Image.open(io.BytesIO(image_data)).convert("RGB").resize((512, 512)) + encoded_image_base64 = encode_image_base64(image) -# question_with_image_token = f"{question}\n{IMAGE_TOKEN}" -# response = await asyncio.to_thread(pipe, (question, image)) -# return JSONResponse({"query": question, "response": response.text}) -# except Exception as e: -# return JSONResponse({"query": question, "error": str(e)}) + question_with_image_token = f"{question}\n{IMAGE_TOKEN}" + response = await asyncio.to_thread(pipe, (question, image)) + return JSONResponse({"query": question, "response": response.text}) + except Exception as e: + return JSONResponse({"query": question, "error": str(e)}) # import mimetypes -async def image_query(image: np.ndarray, question: str): - """ - API endpoint to process an image (as numpy array) with the user's query. - """ - try: - # Convert the numpy array to a PIL Image - image = Image.fromarray(image).convert("RGB").resize((512, 512)) +# async def image_query(image: np.ndarray, question: str): +# """ +# API endpoint to process an image (as numpy array) with the user's query. +# """ +# try: +# # Convert the numpy array to a PIL Image +# image = Image.fromarray(image).convert("RGB").resize((512, 512)) - # Encode the image to base64 (optional, if needed by your pipeline) - buffered = io.BytesIO() - image.save(buffered, format="JPEG") - encoded_image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") +# # Encode the image to base64 (optional, if needed by your pipeline) +# buffered = io.BytesIO() +# image.save(buffered, format="JPEG") +# encoded_image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") - # Prepare the query with the image token - question_with_image_token = f"{question}\n{IMAGE_TOKEN}" +# # Prepare the query with the image token +# question_with_image_token = f"{question}\n{IMAGE_TOKEN}" - # Query the model - response = await asyncio.to_thread(pipe, (question, image)) - return {"query": question, "response": response.text} - except Exception as e: - return {"query": question, "error": str(e)} +# # Query the model +# response = await asyncio.to_thread(pipe, (question, image)) +# return {"query": question, "response": response.text} +# except Exception as e: +# return {"query": question, "error": str(e)} diff --git a/endpoints/text.py b/endpoints/text.py index 064e01b..9cf2ee1 100644 --- a/endpoints/text.py +++ b/endpoints/text.py @@ -1,24 +1,25 @@ +import asyncio from fastapi import Form from fastapi.responses import JSONResponse from asyncio import to_thread from pipeline_setup import pipe +async def text_query(question: str = Form(...)): + """ + API endpoint to process text input with the user's query. + """ + try: + response = await asyncio.to_thread(pipe, question) + return JSONResponse({"query": question, "response": response.text}) + except Exception as e: + return JSONResponse({"query": question, "error": str(e)}) + # async def text_query(question: str = Form(...)): # """ # API endpoint to process text input with the user's query. # """ # try: # response = await to_thread(pipe, question) -# return JSONResponse({"query": question, "response": response.text}) +# return {"query": question, "response": response.text} # except Exception as e: -# return JSONResponse({"query": question, "error": str(e)}) - -async def text_query(question: str = Form(...)): - """ - API endpoint to process text input with the user's query. - """ - try: - response = await to_thread(pipe, question) - return {"query": question, "response": response.text} - except Exception as e: - return {"query": question, "error": str(e)} +# return {"query": question, "error": str(e)} diff --git a/endpoints/video.py b/endpoints/video.py index 6d8b5db..98dee51 100644 --- a/endpoints/video.py +++ b/endpoints/video.py @@ -4,140 +4,39 @@ from pipeline_setup import pipe from utils.image_processing import encode_image_base64 from utils.video_processing import split_video_into_segments, extract_motion_key_frames, extract_audio_from_video from utils.audio_transcription import transcribe_audio +import time import asyncio import mimetypes from concurrent.futures import ThreadPoolExecutor -# async def video_query(file: UploadFile, question: str = Form(...)): -# """ -# API endpoint to process a video file with the user's query. -# """ -# try: -# print("Processing video...") - -# # Validate file type -# if file.content_type not in ["video/mp4", "video/avi", "video/mkv"]: -# return JSONResponse({"query": question, "error": "Unsupported video file type."}) - -# # Start overall timer -# overall_start_time = time.time() - -# # Save the uploaded video to a temporary file -# print("Reading video...") -# video_data = await file.read() -# temp_video_path = "/tmp/temp_video.mp4" -# with open(temp_video_path, "wb") as temp_video_file: -# temp_video_file.write(video_data) -# print(f"Temp video saved to: {temp_video_path}") - -# # Record the time after reading the video -# video_reading_time = time.time() - -# # Split the video into segments -# print("Splitting video...") -# segments = split_video_into_segments(temp_video_path, segment_duration=30) -# print(f"Video split into {len(segments)} segments.") - -# aggregated_responses = [] -# segment_timings = [] - -# for i, segment_path in enumerate(segments): -# print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") - -# # Start timing for the segment -# segment_start_time = time.time() - -# # Extract key frames -# frame_start_time = time.time() -# imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) -# frame_time = time.time() - -# # Extract audio and transcribe -# audio_start_time = time.time() -# audio_path = extract_audio_from_video(segment_path) -# transcribed_text = transcribe_audio(audio_path) -# audio_time = time.time() - -# # Combine transcribed text with the query -# combined_query = f"Audio Transcript: {transcribed_text}\n{question}" - -# # Prepare content for the pipeline -# question_with_frames = "" -# for j, img in enumerate(imgs): -# question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" -# question_with_frames += combined_query - -# content = [{"type": "text", "text": question_with_frames}] -# for img in imgs: -# content.append({ -# "type": "image_url", -# "image_url": { -# "max_dynamic_patch": 1, -# "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" -# } -# }) - -# # Query the model -# inference_start_time = time.time() -# messages = [dict(role="user", content=content)] -# response = await asyncio.to_thread(pipe, messages) -# inference_time = time.time() - -# # Aggregate response -# aggregated_responses.append(response.text) - -# # Calculate timing for the segment -# segment_timings.append({ -# "segment_index": i + 1, -# "segment_processing_time": inference_time - segment_start_time, -# "frame_extraction_time": frame_time - frame_start_time, -# "audio_extraction_time": audio_time - audio_start_time, -# "model_inference_time": inference_time - inference_start_time -# }) - -# print(f"transcription: {transcribed_text}") -# # print(f"content: {content}") - -# overall_end_time = time.time() - -# # Aggregate total timings -# total_timings = { -# "video_reading_time": video_reading_time - overall_start_time, -# "total_segments": len(segments), -# "total_processing_time": overall_end_time - overall_start_time, -# "segment_details": segment_timings -# } - -# return JSONResponse({ -# "question": question, -# "responses": aggregated_responses, -# "timings": total_timings, -# }) -# except Exception as e: -# return JSONResponse({"query": question, "error": str(e)}) - - -async def video_query(video_path: str, question: str): +async def video_query(file: UploadFile, question: str = Form(...)): """ API endpoint to process a video file with the user's query. """ try: print("Processing video...") - if not video_path or not isinstance(video_path, str): - return {"query": question, "error": "No video file provided or invalid file input."} + # Validate file type + if file.content_type not in ["video/mp4", "video/avi", "video/mkv"]: + return JSONResponse({"query": question, "error": "Unsupported video file type."}) - # Determine the file type using the file extension - file_type, _ = mimetypes.guess_type(video_path) - if file_type is None or not file_type.startswith("video/"): - return {"query": question, "error": "Unsupported video file type."} + # Start overall timer + overall_start_time = time.time() - # Log the video path - print(f"Video path: {video_path}") + # Save the uploaded video to a temporary file + print("Reading video...") + video_data = await file.read() + temp_video_path = "/tmp/temp_video.mp4" + with open(temp_video_path, "wb") as temp_video_file: + temp_video_file.write(video_data) + print(f"Temp video saved to: {temp_video_path}") + + # Record the time after reading the video + video_reading_time = time.time() # Split the video into segments print("Splitting video...") - segments = split_video_into_segments(video_path, segment_duration=30) + segments = split_video_into_segments(temp_video_path, segment_duration=30) print(f"Video split into {len(segments)} segments.") aggregated_responses = [] @@ -146,12 +45,19 @@ async def video_query(video_path: str, question: str): for i, segment_path in enumerate(segments): print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") + # Start timing for the segment + segment_start_time = time.time() + # Extract key frames + frame_start_time = time.time() imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) + frame_time = time.time() # Extract audio and transcribe + audio_start_time = time.time() audio_path = extract_audio_from_video(segment_path) transcribed_text = transcribe_audio(audio_path) + audio_time = time.time() # Combine transcribed text with the query combined_query = f"Audio Transcript: {transcribed_text}\n{question}" @@ -173,15 +79,110 @@ async def video_query(video_path: str, question: str): }) # Query the model + inference_start_time = time.time() messages = [dict(role="user", content=content)] response = await asyncio.to_thread(pipe, messages) + inference_time = time.time() # Aggregate response aggregated_responses.append(response.text) - return { + # Calculate timing for the segment + segment_timings.append({ + "segment_index": i + 1, + "segment_processing_time": inference_time - segment_start_time, + "frame_extraction_time": frame_time - frame_start_time, + "audio_extraction_time": audio_time - audio_start_time, + "model_inference_time": inference_time - inference_start_time + }) + + print(f"transcription: {transcribed_text}") + # print(f"content: {content}") + + overall_end_time = time.time() + + # Aggregate total timings + total_timings = { + "video_reading_time": video_reading_time - overall_start_time, + "total_segments": len(segments), + "total_processing_time": overall_end_time - overall_start_time, + "segment_details": segment_timings + } + + return JSONResponse({ "question": question, "responses": aggregated_responses, - } + "timings": total_timings, + }) except Exception as e: - return {"query": question, "error": str(e)} + return JSONResponse({"query": question, "error": str(e)}) + + +# async def video_query(video_path: str, question: str): +# """ +# API endpoint to process a video file with the user's query. +# """ +# try: +# print("Processing video...") + +# if not video_path or not isinstance(video_path, str): +# return {"query": question, "error": "No video file provided or invalid file input."} + +# # Determine the file type using the file extension +# file_type, _ = mimetypes.guess_type(video_path) +# if file_type is None or not file_type.startswith("video/"): +# return {"query": question, "error": "Unsupported video file type."} + +# # Log the video path +# print(f"Video path: {video_path}") + +# # Split the video into segments +# print("Splitting video...") +# segments = split_video_into_segments(video_path, segment_duration=30) +# print(f"Video split into {len(segments)} segments.") + +# aggregated_responses = [] +# segment_timings = [] + +# for i, segment_path in enumerate(segments): +# print(f"Processing segment {i+1}/{len(segments)}: {segment_path}") + +# # Extract key frames +# imgs = extract_motion_key_frames(segment_path, max_frames=50, sigma_multiplier=2) + +# # Extract audio and transcribe +# audio_path = extract_audio_from_video(segment_path) +# transcribed_text = transcribe_audio(audio_path) + +# # Combine transcribed text with the query +# combined_query = f"Audio Transcript: {transcribed_text}\n{question}" + +# # Prepare content for the pipeline +# question_with_frames = "" +# for j, img in enumerate(imgs): +# question_with_frames += f"Frame{j+1}: {{IMAGE_TOKEN}}\n" +# question_with_frames += combined_query + +# content = [{"type": "text", "text": question_with_frames}] +# for img in imgs: +# content.append({ +# "type": "image_url", +# "image_url": { +# "max_dynamic_patch": 1, +# "url": f"data:image/jpeg;base64,{encode_image_base64(img)}" +# } +# }) + +# # Query the model +# messages = [dict(role="user", content=content)] +# response = await asyncio.to_thread(pipe, messages) + +# # Aggregate response +# aggregated_responses.append(response.text) + +# return { +# "question": question, +# "responses": aggregated_responses, +# } +# except Exception as e: +# return {"query": question, "error": str(e)} diff --git a/locustfile.py b/locustfile.py new file mode 100644 index 0000000..75ef53a --- /dev/null +++ b/locustfile.py @@ -0,0 +1,18 @@ +from locust import HttpUser, task, between + +class MultiModalUser(HttpUser): + wait_time = between(1, 3) + + @task + def text_query(self): + self.client.post("/api/predict", json={"input": "What is the capital of France?"}) + + # @task + # def image_query(self): + # with open("test_image.jpg", "rb") as f: + # self.client.post("/api/image_predict", files={"file": f}, data={"question": "What is in this image?"}) + + # @task + # def video_query(self): + # with open("test_video.mp4", "rb") as f: + # self.client.post("/api/video_predict", files={"file": f}, data={"question": "What is happening in this video?"}) \ No newline at end of file diff --git a/main.py b/main.py index 97d8b51..20c96e8 100644 --- a/main.py +++ b/main.py @@ -12,7 +12,7 @@ app.post("/api/video")(video_query) if __name__ == "__main__": import uvicorn - uvicorn.run("main:app", host="0.0.0.0", port=8080, reload=True) + uvicorn.run("main:app", host="0.0.0.0", port=8002, reload=True) # python main.py # uvicorn main:app --reload \ No newline at end of file diff --git a/multi-user.py b/multi-user.py new file mode 100644 index 0000000..aa9f2cf --- /dev/null +++ b/multi-user.py @@ -0,0 +1,22 @@ +import requests +import threading + +def send_request(user_id): + url = "http://localhost:8002" + data = {"input": f"Test input from user {user_id}"} + response = requests.post(url, json=data) + print(f"User {user_id} response: {response.text}") + +def main(num_users): + threads = [] + for i in range(num_users): + thread = threading.Thread(target=send_request, args=(i,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + +if __name__ == "__main__": + num_users = 2 + main(num_users) \ No newline at end of file diff --git a/pipeline_setup.py b/pipeline_setup.py index 4641409..392a9bf 100644 --- a/pipeline_setup.py +++ b/pipeline_setup.py @@ -1,21 +1,24 @@ +import os from lmdeploy import pipeline, TurbomindEngineConfig, GenerationConfig -# Constants IMAGE_TOKEN = "[IMAGE_TOKEN]" +os.environ["CUDA_VISIBLE_DEVICES"] = "0,1" + # Model initialization model = "OpenGVLab/InternVL2-26B-AWQ" pipe = pipeline( model, backend_config=TurbomindEngineConfig( model_format="awq", - # tp=2, - tp=4, - # device_ids=[0, 1], - session_len=12864, + tp=2, + # tp=4, + session_len=2048, # 4096, 8192, 16384, 32768 max_batch_size=1, - cache_max_entry_count=0.05, - cache_block_seq_len=32768, - quant_policy=4 - ) + cache_max_entry_count=0.1, # 0.05 + cache_block_seq_len=4096, # 8192, 16384, 32768 + quant_policy=4, + # precision="fp16", + ), + # log_level='DEBUG' ) \ No newline at end of file diff --git a/ui.py b/ui.py index edb62a2..42fd2b5 100644 --- a/ui.py +++ b/ui.py @@ -1,4 +1,3 @@ -import os import asyncio import gradio as gr from gradio_image_prompter import ImagePrompter @@ -6,8 +5,6 @@ from endpoints.text import text_query from endpoints.image import image_query from endpoints.video import video_query -os.environ["CUDA_VISIBLE_DEVICES"] = "0,1" - import torch print("Available GPUs:", torch.cuda.device_count()) print("Visible Devices:", [torch.cuda.get_device_name(i) for i in range(torch.cuda.device_count())])