updated celery redis

This commit is contained in:
Zixiao Wang 2025-02-07 19:18:35 +08:00
parent 7b80572f09
commit 814e558959
17 changed files with 309 additions and 55 deletions

0
__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

25
celery_app.py Normal file
View File

@ -0,0 +1,25 @@
# from celery import Celery
# celery_app = Celery(
# "tasks",
# broker="redis://localhost:6379/0", # Redis as broker
# backend="redis://localhost:6379/0", # Redis for storing results
# )
# celery_app.conf.task_routes = {
# "tasks.*": {"queue": "default"},
# }
from celery import Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
include=["tasks"] # ✅ Prevents import issues
)
celery_app.conf.task_routes = {
"tasks.*": {"queue": "default"},
}

0
endpoints/__init__.py Normal file
View File

View File

@ -1,11 +1,10 @@
from fastapi import UploadFile, Form from fastapi import UploadFile, Form
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
import io import io
import base64
import asyncio import asyncio
import numpy as np import numpy as np
from PIL import Image from PIL import Image
from pipeline_setup import pipe, IMAGE_TOKEN from pipeline_setup import pipe, IMAGE_TOKEN
from utils.image_processing import encode_image_base64 from utils.image_processing import encode_image_base64
async def image_query(file: UploadFile, question: str = Form(...)): async def image_query(file: UploadFile, question: str = Form(...)):
@ -21,13 +20,12 @@ async def image_query(file: UploadFile, question: str = Form(...)):
encoded_image_base64 = encode_image_base64(image) encoded_image_base64 = encode_image_base64(image)
question_with_image_token = f"{question}\n{IMAGE_TOKEN}" question_with_image_token = f"{question}\n{IMAGE_TOKEN}"
response = await asyncio.to_thread(pipe, (question, image)) response = await asyncio.to_thread(pipe, (question, image))
return JSONResponse({"query": question, "response": response.text}) return JSONResponse({"query": question, "response": response.text})
except Exception as e: except Exception as e:
return JSONResponse({"query": question, "error": str(e)}) return JSONResponse({"query": question, "error": str(e)})
# import mimetypes
# async def image_query(image: np.ndarray, question: str): # async def image_query(image: np.ndarray, question: str):
# """ # """
# API endpoint to process an image (as numpy array) with the user's query. # API endpoint to process an image (as numpy array) with the user's query.

View File

@ -9,11 +9,12 @@ async def text_query(question: str = Form(...)):
API endpoint to process text input with the user's query. API endpoint to process text input with the user's query.
""" """
try: try:
response = await asyncio.to_thread(pipe, question) response = await to_thread(pipe, question)
return JSONResponse({"query": question, "response": response.text}) return JSONResponse({"query": question, "response": response.text})
except Exception as e: except Exception as e:
return JSONResponse({"query": question, "error": str(e)}) return JSONResponse({"query": question, "error": str(e)})
# async def text_query(question: str = Form(...)): # async def text_query(question: str = Form(...)):
# """ # """
# API endpoint to process text input with the user's query. # API endpoint to process text input with the user's query.

View File

@ -185,4 +185,4 @@ async def video_query(file: UploadFile, question: str = Form(...)):
# "responses": aggregated_responses, # "responses": aggregated_responses,
# } # }
# except Exception as e: # except Exception as e:
# return {"query": question, "error": str(e)} # return {"query": question, "error": str(e)}

View File

@ -1,18 +1,117 @@
from locust import HttpUser, task, between import os
import time
import asyncio
from locust import FastHttpUser, HttpUser, task, between, constant
class MultiModalUser(HttpUser): import torch
wait_time = between(1, 3) print(torch.cuda.is_available())
class FastAPIUser(FastHttpUser):
# wait_time = between(1, 3) # Wait time between requests
wait_time = constant(0) # send request simultaneously
def ensure_event_loop(self):
"""Ensures each Locust task runs in a fresh asyncio event loop."""
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# @task
# def text_query(self):
# """Simulate text query request."""
# url = "/api/text"
# # start_event.wait() # Synchronize all users
# start_time = time.time()
# response = self.client.post(url, data={"question": "What is FastAPI?"})
# total_time = time.time() - start_time
# if response.status_code == 200:
# print(f"Success: {response.json()} (Time: {total_time:.2f}s)")
# else:
# print(f"Error: {response.status_code} - {response.text}")
# @task
# def upload_image(self):
# """Sends an image with the correct query format."""
# url = "/api/image"
# image_path = "../profile/1.png"
# if not os.path.exists(image_path):
# print(f"Error: File {image_path} not found!")
# return
# query_text = (
# "Extract the following information from this image and return the result in JSON format:\n"
# "- Name: <name>\n"
# "- ID: <id>\n"
# "- Profile Picture: <url>\n"
# "- Follower Count: <count>\n"
# "- Likes Count: <count>\n"
# "- Bio: <bio>\n"
# "- Following Count: <count>\n"
# "- External Links: <links>\n"
# "Do not include any disclaimers or comments like 'I'm sorry' or 'I can't assist'."
# )
# with open(image_path, "rb") as image_file:
# files = {
# "file": ("1.png", image_file, "image/png"),
# "question": (None, query_text)
# }
# start_time = time.time()
# response = self.client.post(url, files=files)
# total_time = time.time() - start_time
# if response.status_code == 200:
# print(f"Success: {response.json()} (Time: {total_time:.2f}s)")
# else:
# print(f"Error: {response.status_code} - {response.text}")
@task @task
def text_query(self): def video_query(self):
self.client.post("/api/predict", json={"input": "What is the capital of France?"}) """Uploads a video and asks a detailed question."""
url = "/api/video"
video_path = "../video/1.1.mp4"
# @task if not os.path.exists(video_path):
# def image_query(self): print(f"Error: File {video_path} not found!")
# with open("test_image.jpg", "rb") as f: return
# self.client.post("/api/image_predict", files={"file": f}, data={"question": "What is in this image?"})
# @task with open(video_path, "rb") as file:
# def video_query(self): files = {
# with open("test_video.mp4", "rb") as f: "file": ("1.1.mp4", file, "video/mp4"),
# self.client.post("/api/video_predict", files={"file": f}, data={"question": "What is happening in this video?"}) "question": (None, """Based on the given images and audio script, extract detailed
information about the products recommended in the video and format the output as JSON with
the following fields:
1. **Product Name**: The specific name of the product, if mentioned.
2. **Category**: The specific category of the product (e.g., electronics, skincare, casual wear, etc.).
3. **Styles or Variants**: Any styles, designs, or variants of the product described
(e.g., colors, patterns, sizes, or other distinguishing attributes).
4. **Highlights**: The unique selling points or notable features emphasized by the anchor
(e.g., benefits, quality, or standout features).
5. **Promotional Details**: Any additional promotional information mentioned, such as discounts,
offers, or key features that set the product apart."""
)
}
print("Starting video query...")
start_time = time.time()
response = self.client.post(url, files=files)
total_time = time.time() - start_time
if response.status_code == 200:
print(f"Success: {response.json()} (Time: {total_time:.2f}s)")
else:
print(f"Error: {response.status_code} - {response.text}")
print("Video query completed.")
# locust -f locustfile.py
# curl -X POST http://localhost:8002/api/image \
# -F "file=@../profile/1.png" \
# -F "question=What is this image about?"

91
main.py
View File

@ -1,18 +1,83 @@
from fastapi import FastAPI # from fastapi import FastAPI, Form, UploadFile
from endpoints.text import text_query # from fastapi.responses import JSONResponse
from endpoints.image import image_query # import asyncio
from endpoints.video import video_query
# app = FastAPI()
# @app.post("/api/text")
# async def text_query_endpoint(question: str = Form(...)):
# """
# API endpoint to process text input with the user's query.
# """
# from endpoints.text import text_query
# return await text_query(question=question)
# @app.post("/api/image")
# async def image_query_endpoint(file: UploadFile, question: str = Form(...)):
# """
# API endpoint to process an image with the user's query.
# """
# from endpoints.image import image_query
# return await image_query(file=file, question=question)
# @app.post("/api/video")
# async def video_query_endpoint(file: UploadFile, question: str = Form(...)):
# """
# API endpoint to process a video file with the user's query.
# """
# from endpoints.video import video_query
# return await video_query(file=file, question=question)
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run("main:app", host="0.0.0.0", port=8002, reload=True, loop="uvloop")
from fastapi import FastAPI, Form, UploadFile
from fastapi.responses import JSONResponse
import shutil
import uuid
from tasks import text_query_task, image_query_task, video_query_task
app = FastAPI() app = FastAPI()
# Register routes # @app.post("/api/text")
app.post("/api/text")(text_query) # async def text_query_endpoint(question: str = Form(...)):
app.post("/api/image")(image_query) # task = text_query_task.apply_async(args=[question])
app.post("/api/video")(video_query) # return JSONResponse({"task_id": task.id})
if __name__ == "__main__": @app.post("/api/text")
import uvicorn async def text_query_endpoint(question: str = Form(...)):
uvicorn.run("main:app", host="0.0.0.0", port=8002, reload=True) print(f"Received request: {question}")
task = text_query_task.apply_async(args=[question])
print(f"Task sent: {task.id}")
return JSONResponse({"task_id": task.id})
# python main.py @app.post("/api/image")
# uvicorn main:app --reload async def image_query_endpoint(file: UploadFile, question: str = Form(...)):
file_path = f"/tmp/{uuid.uuid4()}_{file.filename}"
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
task = image_query_task.apply_async(args=[file_path, question])
return JSONResponse({"task_id": task.id})
@app.post("/api/video")
async def video_query_endpoint(file: UploadFile, question: str = Form(...)):
file_path = f"/tmp/{uuid.uuid4()}_{file.filename}"
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
task = video_query_task.apply_async(args=[file_path, question])
return JSONResponse({"task_id": task.id})
@app.get("/api/task/{task_id}")
async def get_task_result(task_id: str):
from celery.result import AsyncResult
result = AsyncResult(task_id)
if result.ready():
return JSONResponse({"status": "completed", "result": result.result})
return JSONResponse({"status": "pending"})

View File

@ -1,22 +1,63 @@
import requests import asyncio
import threading import httpx
def send_request(user_id): BASE_URL = "http://localhost:8002"
url = "http://localhost:8002"
data = {"input": f"Test input from user {user_id}"}
response = requests.post(url, json=data)
print(f"User {user_id} response: {response.text}")
def main(num_users): async def test_text_query(session_id: int):
threads = [] """Send a POST request to the /api/text endpoint."""
for i in range(num_users): try:
thread = threading.Thread(target=send_request, args=(i,)) async with httpx.AsyncClient(timeout=30.0) as client:
threads.append(thread) # Correctly send form data
thread.start() response = await client.post(
"http://localhost:8002/api/text",
data={"question": f"Test question: what is AI from session {session_id}"}
)
print(f"Session {session_id} Status Code: {response.status_code}")
print(f"Session {session_id} Headers: {response.headers}")
if response.content:
print(f"Session {session_id} Response: {response.json()}")
else:
print(f"Session {session_id} Response is empty.")
except Exception as e:
print(f"Session {session_id} encountered an error: {e}")
async def test_image_query(session_id: int):
"""Send a POST request to the /api/image endpoint with a dummy image."""
async with httpx.AsyncClient(timeout=30.0) as client:
files = {'file': ('../profile/1.jpg', b'binary data of the file', 'image/jpeg')}
response = await client.post(
f"{BASE_URL}/api/image",
data={"question": f"Image query: what is in the picture from session {session_id}"},
files=files
)
print(f"Session {session_id} Raw Response: {response.text}")
print(f"Session {session_id} Response: {response.json()}")
async def test_video_query(session_id: int):
"""Send a POST request to the /api/video endpoint with a dummy video."""
async with httpx.AsyncClient(timeout=30.0) as client:
files = {'file': ('../video/1.mp4', b'binary data of the file', 'video/mp4')}
response = await client.post(
f"{BASE_URL}/api/video",
data={"question": f"Video query: what is in the video from session {session_id}"},
files=files
)
print(f"Session {session_id} Raw Response: {response.text}")
print(f"Session {session_id} Response: {response.json()}")
async def main():
tasks = []
for i in range(1, 3):
tasks.append(test_text_query(i))
# tasks.append(test_image_query(i))
# tasks.append(test_video_query(i))
# Run all tasks concurrently
await asyncio.gather(*tasks)
for thread in threads:
thread.join()
if __name__ == "__main__": if __name__ == "__main__":
num_users = 2 asyncio.run(main())
main(num_users)

View File

@ -4,6 +4,7 @@ from lmdeploy import pipeline, TurbomindEngineConfig, GenerationConfig
IMAGE_TOKEN = "[IMAGE_TOKEN]" IMAGE_TOKEN = "[IMAGE_TOKEN]"
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1" os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"
# os.environ["CUDA_VISIBLE_DEVICES"] = "2,3"
# Model initialization # Model initialization
model = "OpenGVLab/InternVL2-26B-AWQ" model = "OpenGVLab/InternVL2-26B-AWQ"
@ -11,13 +12,13 @@ pipe = pipeline(
model, model,
backend_config=TurbomindEngineConfig( backend_config=TurbomindEngineConfig(
model_format="awq", model_format="awq",
tp=2, tp=2,
# tp=4, # tp=4,
session_len=2048, # 4096, 8192, 16384, 32768 session_len=16384, # 4096, 8192, 16384, 32768
max_batch_size=1, max_batch_size=1,
cache_max_entry_count=0.1, # 0.05 cache_max_entry_count=0.2, # 0.05
cache_block_seq_len=4096, # 8192, 16384, 32768 cache_block_seq_len=16384, # 8192, 16384, 32768
quant_policy=4, # quant_policy=8,
# precision="fp16", # precision="fp16",
), ),
# log_level='DEBUG' # log_level='DEBUG'

25
tasks.py Normal file
View File

@ -0,0 +1,25 @@
import asyncio
from celery_app import celery_app
@celery_app.task
def text_query_task(question: str):
from endpoints.text import text_query # Import inside the function to avoid circular import
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(text_query(question=question)) # ✅ Correct way to call async functions in Celery
@celery_app.task
def image_query_task(file_path: str, question: str):
from endpoints.image import image_query # Import inside the function
with open(file_path, "rb") as file:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(image_query(file=file, question=question)) # ✅ Use event loop
@celery_app.task
def video_query_task(file_path: str, question: str):
from endpoints.video import video_query # Import inside the function
with open(file_path, "rb") as file:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(video_query(file=file, question=question)) # ✅ Use event loop

1
ui.py
View File

@ -92,7 +92,6 @@ def setup_ui():
outputs=[video_output] outputs=[video_output]
) )
return ui return ui
if __name__ == "__main__": if __name__ == "__main__":