From 814e5589591b82a2cec257851279d072988f7fc1 Mon Sep 17 00:00:00 2001 From: Zixiao Wang Date: Fri, 7 Feb 2025 19:18:35 +0800 Subject: [PATCH] updated celery redis --- __init__.py | 0 __pycache__/celery_app.cpython-312.pyc | Bin 0 -> 411 bytes __pycache__/locustfile.cpython-312.pyc | Bin 770 -> 3469 bytes __pycache__/main.cpython-312.pyc | Bin 769 -> 2973 bytes __pycache__/pipeline_setup.cpython-312.pyc | Bin 682 -> 653 bytes __pycache__/tasks.cpython-312.pyc | Bin 0 -> 1783 bytes celery_app.py | 25 +++++ endpoints/__init__.py | 0 endpoints/image.py | 6 +- endpoints/text.py | 3 +- endpoints/video.py | 2 +- locustfile.py | 125 ++++++++++++++++++--- main.py | 91 ++++++++++++--- multi-user.py | 75 ++++++++++--- pipeline_setup.py | 11 +- tasks.py | 25 +++++ ui.py | 1 - 17 files changed, 309 insertions(+), 55 deletions(-) create mode 100644 __init__.py create mode 100644 __pycache__/celery_app.cpython-312.pyc create mode 100644 __pycache__/tasks.cpython-312.pyc create mode 100644 celery_app.py create mode 100644 endpoints/__init__.py create mode 100644 tasks.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/__pycache__/celery_app.cpython-312.pyc b/__pycache__/celery_app.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..46894cf12e29c93f4438e239db036659b361c5c9 GIT binary patch literal 411 zcmYLDy-UMD6u--7Ka5fApmlK23UTNOf?_FxICXJy>Jrl2wT(%0^^y$jq?3+r`VTmW z{wFRjLbyQ?C$~a%aq`j+e2@42e!ur#45I*ql-?gYTM_`DSuq!FmWc8Rm>4+Otgn201JLl}#+jwG$E$HsEUP zM1C|}WdwUs4WWR#PSB-MT-)C}I7GXa%(WJyK4DyMIj&Da%ylnxg9MYIJ^@#ExO$P0 zgm4{`wvz<$siktwog84YxW^0A!gl(7uDCR8^I`_I8BJmmJ%Vxe)o%rLX^$XEy%0q) ziakH3zJ1(4|LAIez;pE;#YsSp*s?%DVNqSUkqjaHDFaykFji8fm=-JP!ggwuZW`Zt hby2!3jFy19c&p#9y_%aNS=tb-YxBx{1MAEb1b;d@Y~}y} literal 0 HcmV?d00001 diff --git a/__pycache__/locustfile.cpython-312.pyc b/__pycache__/locustfile.cpython-312.pyc index 151daadf6ba1cfd25dc6d1eac916b5964c95a7e2..37e21135dce408d7557864d424c84d8df514a3b4 100644 GIT binary patch literal 3469 zcma)9Uu+yl8K2$Ty?wMkGlRr9&a~&b!pNGY@BXnv*+HFp-dq zzJtujHDoG8xf2OibM190t$rdtrqimYy`(dKk@L!Hj8HGWJyW>O=DpY~;Wa`WFK)OF zn^t(2Sg7%GXd%B~f8YXk;!{S3rH6P=EH7OF}!9 z0mWzaNUgHbkh1e`L5}aNhO-bk z+>?af039`SCGrNUNol!p`hvr%lrT(mqljO2jVj|% znK&HVu3O27IK-OkUgsR)c@T0eXD%uy8K&eZ;4hD2zIG}boq}KACmOOKPTU{-=HIl_Pa;5j z%2FWxql3L@`KxD>kq;6QjL!sLguxH~5QG})NLrLkc~R!D96bA_Kq;)gkO|%kau_z& z$zvhGj4a4Ep)zhoLMo(<$`c`dK{lgznn`vFEk?}PLd1+0B4)x&{#;&ET9Ddy5O`ZC zU-D0g@GE$SX|$*=ppy11O?;JvPj)>E(t`3kEwp4Y`u}NIP#d6sF}4sbb?yR$PqTe~ z7$G{)vWE`fJg}EDv=}^I_5e^0?nS?eZYeuhEl3A< zR!vp>h5HvV8bo7=Ni|s%p#*<*wONKejT5g`Y*#lKRNH0CBret+6Dn?AESQArknSda zvq~7ZTqn~=X7!4d^*I}znM^Rtj%CI&<;r+N^9ybgzXVda(S{wD4Iz9ZVk>KT(F33vdKMMV;MX$@=9O;p4Q7`WMmS*3Zkq+469%n*nvgt z7FsPG#a01>onjxv%8G175U=!Kv$51{=j}-f|3` zl2L5vtg73%Msx~1+{mQ&>iwF~JHzWXgvX`$b)8yp(J^7cS*MPfPZ@>yFl){Mbl|z% zazrtQLW&Iu_(PhO5995EZxpAXUJ0$YOmK61qTi9^6>vN>LXUn0U7C z$4hsNB5?kXEzA;!6f6!lzX|TOc-{AkXb?=1o+0)==ALS1PT3V{0HHbWm*uv%beKdXmFSTl_?C`&uX_&Aj0FYsd-Z0HV-zdb8yi!^@R0`PI)FR zdy2Rny?A(BG_XEGVMa#g{~KKT%ixJF{8Y8ZsEN!*Cz;aB0}hFK(Y zve1xOx)Ez0Dvby}fg6Wj_fIJw2}2cEY{H>5(rupXGEadFDSA6BT1@#1sF5_Ms_@7r<-f&IgDJA9#HaK0FF>Uy{KPVL>d-+B8+X+4$R)Vi*}yz-?L?e=$9yT7rnO+3`PAGdd1AGu+! zw-2uHkNO5y&fh-1did*4d|3%-=`feUsntIrK zWcjV1ytUDLY_0d$?a19ztG&nW_f9Nb+~!$nto4m8O?{N=-AE0tr3P2nUsEHSojo@a zE5pAz@yipx%C5d}^6v1x$v+yalT#lKtRDK_gU;#4zz|!BZ{YDYJihw;sk<}x4*#M5 z_x<-$tA{SGcV2p|b**-ftZT==coB6BZL|-rwGTdJqEY$)ojY?r8u@cnJ)eqAr{i8c zpBLw2KJO*-`7#{iwwP=Ayr`ytLPewEC2G1AgoVP-3`9kDL(?$)7Hz?Z{76vIv-pFU zu%j?sL;vhNe532VwyT#uQ&jcLlZJ$%$xr1Wb#yBd&S*B2OLtK5dxCyh45EYaqCtO1 zMFU1fBORy~jdNg`%3W#{>2rcvF{-BSX%@@tm*I*P$4g*SMzH%@{EUg84)*#jIt~QU zpt4CApdXZ^M=0?KwLL-||3K#+pmQH1?e9pvN`74WVQC}UzZUJkAANou;m?(VB#mAl K{WlVCAN#**TXKZ} literal 770 zcmY*WJ#Q015S_gb`{LLr(8OuNQBYh8pXewM2nrxY+#$lHI32fZe9nDwyOyyLs81Nef@%FHEM zv=li33Y5f<5RW99rzN_l6VTuuP;CrUXJo3E`JOQ}9CMD2qf6_)5cvbn3TcRdAIj=u zCI*b9VVPW_v2>P73_L=?(==c@B~+WL&d@xAnN%Ocv~*N;WCm(d_bp*d0E zIW`j_TNzG+my0_JPT)E8R*`dOZ#l!cD>XIMs2-7b#;OkjLvYOYsO|=N&98w=b{tEtd$RCO|WOef7)loaJ2RR8e0EC74;;) ztl$XFtoqCK7waFb=Bd@3Y`+=IuJ6xUk7ic$gXJx7?6y;tmhY!Q!hB!Wd_T!(KSo~n zl~NFwkplNA*9Ji(d=VwAz&|W5DtHUUCFC0j)fM1k`4t6Y__VP(A(Lyb8%Mim=Gr%n zmT@gQzy;Qq8Yo=8yNT?tmb7k^t==rQQ~>wLk$lw&A)jIA6n4HF PCOIIl_J07Le@cG?|HQK> diff --git a/__pycache__/main.cpython-312.pyc b/__pycache__/main.cpython-312.pyc index c2dbc978a7f30dd86fa455f7b14b6391ecb31217..67850100bac471c046e3ec7a17622377af2c9a42 100644 GIT binary patch literal 2973 zcmeHJU2GIp6ux(U_HTCkLyNlCtIf$qwWV$m2|p#+ekpo#2*&9XBsv+V4wcV;Oq zyQKUi^56q+mVjY-FtjNqMqltr#YbPZX%pa5qehMT)>0GH7~?t9on5 zr0k5vtS9M#IWg-^dZCxHKEt2%8-ZlNP?8EI0@3J1)oe1T1vAUt#qRa0JOS%r18G6^ zxPSdr^=b|Mg6caX*K9OC!iL|~dca0N6CFE}s?^zOdW4)o$e zc{68ey&&CaYsc)qw+k8@>a$bU5#FR5sR8Z2nWMU@nRPS6{zNLTCr~raeV}XEx|vH? z@7&q71oSj~^B)7*LnwIPuJKh`Bp1m0f_oxLY|+sIl0nTytw;vw)tYpP)>#sZG~=SI zPHT~Mzspjkhs*2K-OB;;p*Toma_{hx{aRYnk7{a^Y0eq9MaLPuNexilAamEk^P0>g0O66#n245QIM+7pLO zLra*Zo=aGE!q$)2=8?XxT?r$l=i>Px-csizEvM!|&}OZmyahgMACRBPZzJRf$zKVs z96dIgsRUxv(z+R??S`^uQdu+h=Gg9OW$VcH8OeWk&!}|Cd)|9)s1jH;Ej=-#gh!W^ zw_n+Ld1ocEVM^(!NFBdhxYtu%OS{5!IN&(jK?Rrk0Y+)uw%bt9ar7o|?}DS%ItOLy zWvex)8>4hUBz7a}FeZgIJ)T$D2CiAAj*P^-yCV$nf?l7kQkl)U@g zG4UW7qzJTePw%jtu#J3z(>^W?9%8t4)&L}?aH&wxRW2jl!hLx6xs;)C$;@jx?y(LR zY(2}Bw3#15o_Q$4mZG)~2_D3O9BYQrxV*q}z~@KlzI@6)%sqz+{rwtaOF+hwfkd$} zJRfhZ&jdBttp?Wf@XZecabWlDlJG_8Qs8{xLh#&)@@r-LO6hW`Y*w0{9oaSG51rLV zTc`ai%WGzQ4L5vklfJgmEtfjacb5C7eI0j25@`R`LxL-AC~GH`wPX6VNTs8DT6r13 z`VeH^^@DuHzw*!d6|nrNjoo4T$*OKe{GxR$46iG4cdPe$lNZKS+&a1If5iTGh-G28 z_?W`lpkr8k8G3>}j$|d0b|elCF-jNVkVRqc0ie|vm>SU5g5-9z@FWr}iNh@mbq7*s z;&?UbmYi6*ul68}os@JA25w4%R|dR1N)%rZMW{cuU>1r(1~nEnH%YO`y)>kXAdA7Z zsHxJ$T6H==>f~kBQ=bT&%Hr*1n8~0PD_vL`pcogjHutAZBcIi5O^pc<+IGq=Slq)L zCR+~*+zUak>bU_ZGoiGWg@PTgvSY}WnU+$A79xHFE+9HL!;r~v!onI`3+~1N&2|cY zU)9Ldz}<^tF9C7l`bQ~tD-;xoFwUse}yk5<|>z5st?2IOAjd1v*v*SX2+s;Inhm<58>VRZR@aWt(_b!|BOV$rCugy{w)qZr ns1cbJU^LtG?=-?E!;lB3X#1=n(?l7}LWgFfb2z~|;c$Ngn}}SE literal 769 zcmaJLU6{r?B>r&_H<*jhs9mF zR<^dX7M7NRrPz3f93u;gmEFR1R?cKMyCR2^FnRCGoA14k&26( zBhg(hiYru}JzcF;8ahAsAGJPL&k+J@5~F(^fHlPw_0V!HkF-emk&a<|3WO5MRSQJZdrv6hcQ!x1ha4f8)hj0?9Vn3pu$MxQB@F-z1 z$E+8>ffFHw-|Nm!5>Y1+ZcdgvdC<+1uD7x2$O+nMpBuNLg!Dpso1GVW`Ps9lVw+%u z&={IyxH^Vw;{Ku8FMCi^p~8F4Uhsx`3Q wWaq!Z!g*6zXn66S-u7vLH>x!IMpy)J)kJIMo_hZ?f~>VBx$etZ_qB>IOH@0}+WE;u1IH6mQ5V Y+z=J}BFH7e_Jx&?gQ=0FND8PI013n{@c;k- diff --git a/__pycache__/tasks.cpython-312.pyc b/__pycache__/tasks.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..6bab612d675203fdcede639c95cd51992ca591dc GIT binary patch literal 1783 zcmd^9&2JM&6rY(L$96XH(gbK5A=~+Yu&NXb2~?>sy zd-Hqa-zO)>35AOs$Qd67fP5~cyx3SM=u{Qr&uh$ES4^L4N>wST*+4@ z<*X@h%_@IYQWbyAUuw1j<8_J_8f6Y#F;o9j5Ld}lZua%5o<7^rXZQ5^t6%oH-s3YJ zKC^SO%Zm?nqo>bx^ts!g{kqki|9np`|G~>ca3w+ZqEi=Ev!X6l@jl{_1|=l#0VoOb zSdzRpv9;Zi*D12CDf9|Q?SJTcMJrF>!ef*u7Fn7*4p$gkE6#P zRR+%nzSvI5IWkX95T9;B=I058uSK@B^JJ5nsaGwaJB`Y!hY!aJ-kO8`6RC zx*I~IoPsO@?lpxgu~L~ufwf5NLNg@~ppFcMPn`w^J& zkl`{2r1q13cJjKjbFiDu-&*J!hkM4+j&XF)SiH{r8KalUcQW}M)y*tCG^TsTv5s-< z_T_v0{#JMK?4I%U-)S;l=wGzrP4aXKj1v9h5oU@a*-vI3wr4hJT3u6%YfBQu>JG9+TN8RR5vyePO^j{etdPUl5#!8etzq krojYZ6A>aszoECZ0m1nf|3x^DkZ2m@_D4Qvhzt|_7YNTxTmS$7 literal 0 HcmV?d00001 diff --git a/celery_app.py b/celery_app.py new file mode 100644 index 0000000..20bff9f --- /dev/null +++ b/celery_app.py @@ -0,0 +1,25 @@ +# from celery import Celery + +# celery_app = Celery( +# "tasks", +# broker="redis://localhost:6379/0", # Redis as broker +# backend="redis://localhost:6379/0", # Redis for storing results +# ) + +# celery_app.conf.task_routes = { +# "tasks.*": {"queue": "default"}, +# } + +from celery import Celery + +celery_app = Celery( + "tasks", + broker="redis://localhost:6379/0", + backend="redis://localhost:6379/0", + include=["tasks"] # ✅ Prevents import issues +) + +celery_app.conf.task_routes = { + "tasks.*": {"queue": "default"}, +} + diff --git a/endpoints/__init__.py b/endpoints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/endpoints/image.py b/endpoints/image.py index 466a467..c5a6da3 100644 --- a/endpoints/image.py +++ b/endpoints/image.py @@ -1,11 +1,10 @@ from fastapi import UploadFile, Form from fastapi.responses import JSONResponse import io -import base64 import asyncio import numpy as np from PIL import Image -from pipeline_setup import pipe, IMAGE_TOKEN +from pipeline_setup import pipe, IMAGE_TOKEN from utils.image_processing import encode_image_base64 async def image_query(file: UploadFile, question: str = Form(...)): @@ -21,13 +20,12 @@ async def image_query(file: UploadFile, question: str = Form(...)): encoded_image_base64 = encode_image_base64(image) question_with_image_token = f"{question}\n{IMAGE_TOKEN}" - response = await asyncio.to_thread(pipe, (question, image)) + response = await asyncio.to_thread(pipe, (question, image)) return JSONResponse({"query": question, "response": response.text}) except Exception as e: return JSONResponse({"query": question, "error": str(e)}) -# import mimetypes # async def image_query(image: np.ndarray, question: str): # """ # API endpoint to process an image (as numpy array) with the user's query. diff --git a/endpoints/text.py b/endpoints/text.py index 9cf2ee1..5100f03 100644 --- a/endpoints/text.py +++ b/endpoints/text.py @@ -9,11 +9,12 @@ async def text_query(question: str = Form(...)): API endpoint to process text input with the user's query. """ try: - response = await asyncio.to_thread(pipe, question) + response = await to_thread(pipe, question) return JSONResponse({"query": question, "response": response.text}) except Exception as e: return JSONResponse({"query": question, "error": str(e)}) + # async def text_query(question: str = Form(...)): # """ # API endpoint to process text input with the user's query. diff --git a/endpoints/video.py b/endpoints/video.py index 98dee51..59ae9d8 100644 --- a/endpoints/video.py +++ b/endpoints/video.py @@ -185,4 +185,4 @@ async def video_query(file: UploadFile, question: str = Form(...)): # "responses": aggregated_responses, # } # except Exception as e: -# return {"query": question, "error": str(e)} +# return {"query": question, "error": str(e)} \ No newline at end of file diff --git a/locustfile.py b/locustfile.py index 75ef53a..c67683e 100644 --- a/locustfile.py +++ b/locustfile.py @@ -1,18 +1,117 @@ -from locust import HttpUser, task, between +import os +import time +import asyncio +from locust import FastHttpUser, HttpUser, task, between, constant -class MultiModalUser(HttpUser): - wait_time = between(1, 3) +import torch +print(torch.cuda.is_available()) + +class FastAPIUser(FastHttpUser): + # wait_time = between(1, 3) # Wait time between requests + wait_time = constant(0) # send request simultaneously + + def ensure_event_loop(self): + """Ensures each Locust task runs in a fresh asyncio event loop.""" + try: + asyncio.get_running_loop() + except RuntimeError: + asyncio.set_event_loop(asyncio.new_event_loop()) + + # @task + # def text_query(self): + # """Simulate text query request.""" + # url = "/api/text" + + # # start_event.wait() # Synchronize all users + + # start_time = time.time() + # response = self.client.post(url, data={"question": "What is FastAPI?"}) + # total_time = time.time() - start_time + + # if response.status_code == 200: + # print(f"Success: {response.json()} (Time: {total_time:.2f}s)") + # else: + # print(f"Error: {response.status_code} - {response.text}") + + # @task + # def upload_image(self): + # """Sends an image with the correct query format.""" + # url = "/api/image" + # image_path = "../profile/1.png" + + # if not os.path.exists(image_path): + # print(f"Error: File {image_path} not found!") + # return + + # query_text = ( + # "Extract the following information from this image and return the result in JSON format:\n" + # "- Name: \n" + # "- ID: \n" + # "- Profile Picture: \n" + # "- Follower Count: \n" + # "- Likes Count: \n" + # "- Bio: \n" + # "- Following Count: \n" + # "- External Links: \n" + # "Do not include any disclaimers or comments like 'I'm sorry' or 'I can't assist'." + # ) + + # with open(image_path, "rb") as image_file: + # files = { + # "file": ("1.png", image_file, "image/png"), + # "question": (None, query_text) + # } + + # start_time = time.time() + # response = self.client.post(url, files=files) + # total_time = time.time() - start_time + + # if response.status_code == 200: + # print(f"Success: {response.json()} (Time: {total_time:.2f}s)") + # else: + # print(f"Error: {response.status_code} - {response.text}") @task - def text_query(self): - self.client.post("/api/predict", json={"input": "What is the capital of France?"}) + def video_query(self): + """Uploads a video and asks a detailed question.""" + url = "/api/video" + video_path = "../video/1.1.mp4" - # @task - # def image_query(self): - # with open("test_image.jpg", "rb") as f: - # self.client.post("/api/image_predict", files={"file": f}, data={"question": "What is in this image?"}) + if not os.path.exists(video_path): + print(f"Error: File {video_path} not found!") + return - # @task - # def video_query(self): - # with open("test_video.mp4", "rb") as f: - # self.client.post("/api/video_predict", files={"file": f}, data={"question": "What is happening in this video?"}) \ No newline at end of file + with open(video_path, "rb") as file: + files = { + "file": ("1.1.mp4", file, "video/mp4"), + "question": (None, """Based on the given images and audio script, extract detailed + information about the products recommended in the video and format the output as JSON with + the following fields: + 1. **Product Name**: The specific name of the product, if mentioned. + 2. **Category**: The specific category of the product (e.g., electronics, skincare, casual wear, etc.). + 3. **Styles or Variants**: Any styles, designs, or variants of the product described + (e.g., colors, patterns, sizes, or other distinguishing attributes). + 4. **Highlights**: The unique selling points or notable features emphasized by the anchor + (e.g., benefits, quality, or standout features). + 5. **Promotional Details**: Any additional promotional information mentioned, such as discounts, + offers, or key features that set the product apart.""" + ) + } + + print("Starting video query...") + start_time = time.time() + response = self.client.post(url, files=files) + total_time = time.time() - start_time + + if response.status_code == 200: + print(f"Success: {response.json()} (Time: {total_time:.2f}s)") + else: + print(f"Error: {response.status_code} - {response.text}") + + print("Video query completed.") + +# locust -f locustfile.py + +# curl -X POST http://localhost:8002/api/image \ +# -F "file=@../profile/1.png" \ +# -F "question=What is this image about?" diff --git a/main.py b/main.py index 20c96e8..832bf12 100644 --- a/main.py +++ b/main.py @@ -1,18 +1,83 @@ -from fastapi import FastAPI -from endpoints.text import text_query -from endpoints.image import image_query -from endpoints.video import video_query +# from fastapi import FastAPI, Form, UploadFile +# from fastapi.responses import JSONResponse +# import asyncio + +# app = FastAPI() + +# @app.post("/api/text") +# async def text_query_endpoint(question: str = Form(...)): +# """ +# API endpoint to process text input with the user's query. +# """ +# from endpoints.text import text_query +# return await text_query(question=question) + + +# @app.post("/api/image") +# async def image_query_endpoint(file: UploadFile, question: str = Form(...)): +# """ +# API endpoint to process an image with the user's query. +# """ +# from endpoints.image import image_query +# return await image_query(file=file, question=question) + + +# @app.post("/api/video") +# async def video_query_endpoint(file: UploadFile, question: str = Form(...)): +# """ +# API endpoint to process a video file with the user's query. +# """ +# from endpoints.video import video_query +# return await video_query(file=file, question=question) + + +# if __name__ == "__main__": +# import uvicorn +# uvicorn.run("main:app", host="0.0.0.0", port=8002, reload=True, loop="uvloop") + + +from fastapi import FastAPI, Form, UploadFile +from fastapi.responses import JSONResponse +import shutil +import uuid +from tasks import text_query_task, image_query_task, video_query_task app = FastAPI() -# Register routes -app.post("/api/text")(text_query) -app.post("/api/image")(image_query) -app.post("/api/video")(video_query) +# @app.post("/api/text") +# async def text_query_endpoint(question: str = Form(...)): +# task = text_query_task.apply_async(args=[question]) +# return JSONResponse({"task_id": task.id}) -if __name__ == "__main__": - import uvicorn - uvicorn.run("main:app", host="0.0.0.0", port=8002, reload=True) +@app.post("/api/text") +async def text_query_endpoint(question: str = Form(...)): + print(f"Received request: {question}") + task = text_query_task.apply_async(args=[question]) + print(f"Task sent: {task.id}") + return JSONResponse({"task_id": task.id}) -# python main.py -# uvicorn main:app --reload \ No newline at end of file +@app.post("/api/image") +async def image_query_endpoint(file: UploadFile, question: str = Form(...)): + file_path = f"/tmp/{uuid.uuid4()}_{file.filename}" + with open(file_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + + task = image_query_task.apply_async(args=[file_path, question]) + return JSONResponse({"task_id": task.id}) + +@app.post("/api/video") +async def video_query_endpoint(file: UploadFile, question: str = Form(...)): + file_path = f"/tmp/{uuid.uuid4()}_{file.filename}" + with open(file_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + + task = video_query_task.apply_async(args=[file_path, question]) + return JSONResponse({"task_id": task.id}) + +@app.get("/api/task/{task_id}") +async def get_task_result(task_id: str): + from celery.result import AsyncResult + result = AsyncResult(task_id) + if result.ready(): + return JSONResponse({"status": "completed", "result": result.result}) + return JSONResponse({"status": "pending"}) diff --git a/multi-user.py b/multi-user.py index aa9f2cf..ebf5999 100644 --- a/multi-user.py +++ b/multi-user.py @@ -1,22 +1,63 @@ -import requests -import threading +import asyncio +import httpx -def send_request(user_id): - url = "http://localhost:8002" - data = {"input": f"Test input from user {user_id}"} - response = requests.post(url, json=data) - print(f"User {user_id} response: {response.text}") +BASE_URL = "http://localhost:8002" -def main(num_users): - threads = [] - for i in range(num_users): - thread = threading.Thread(target=send_request, args=(i,)) - threads.append(thread) - thread.start() +async def test_text_query(session_id: int): + """Send a POST request to the /api/text endpoint.""" + try: + async with httpx.AsyncClient(timeout=30.0) as client: + # Correctly send form data + response = await client.post( + "http://localhost:8002/api/text", + data={"question": f"Test question: what is AI from session {session_id}"} + ) + print(f"Session {session_id} Status Code: {response.status_code}") + print(f"Session {session_id} Headers: {response.headers}") + if response.content: + print(f"Session {session_id} Response: {response.json()}") + else: + print(f"Session {session_id} Response is empty.") + except Exception as e: + print(f"Session {session_id} encountered an error: {e}") + + +async def test_image_query(session_id: int): + """Send a POST request to the /api/image endpoint with a dummy image.""" + async with httpx.AsyncClient(timeout=30.0) as client: + files = {'file': ('../profile/1.jpg', b'binary data of the file', 'image/jpeg')} + response = await client.post( + f"{BASE_URL}/api/image", + data={"question": f"Image query: what is in the picture from session {session_id}"}, + files=files + ) + print(f"Session {session_id} Raw Response: {response.text}") + print(f"Session {session_id} Response: {response.json()}") + + +async def test_video_query(session_id: int): + """Send a POST request to the /api/video endpoint with a dummy video.""" + async with httpx.AsyncClient(timeout=30.0) as client: + files = {'file': ('../video/1.mp4', b'binary data of the file', 'video/mp4')} + response = await client.post( + f"{BASE_URL}/api/video", + data={"question": f"Video query: what is in the video from session {session_id}"}, + files=files + ) + print(f"Session {session_id} Raw Response: {response.text}") + print(f"Session {session_id} Response: {response.json()}") + + +async def main(): + tasks = [] + for i in range(1, 3): + tasks.append(test_text_query(i)) + # tasks.append(test_image_query(i)) + # tasks.append(test_video_query(i)) + + # Run all tasks concurrently + await asyncio.gather(*tasks) - for thread in threads: - thread.join() if __name__ == "__main__": - num_users = 2 - main(num_users) \ No newline at end of file + asyncio.run(main()) diff --git a/pipeline_setup.py b/pipeline_setup.py index 392a9bf..5fc6f85 100644 --- a/pipeline_setup.py +++ b/pipeline_setup.py @@ -4,6 +4,7 @@ from lmdeploy import pipeline, TurbomindEngineConfig, GenerationConfig IMAGE_TOKEN = "[IMAGE_TOKEN]" os.environ["CUDA_VISIBLE_DEVICES"] = "0,1" +# os.environ["CUDA_VISIBLE_DEVICES"] = "2,3" # Model initialization model = "OpenGVLab/InternVL2-26B-AWQ" @@ -11,13 +12,13 @@ pipe = pipeline( model, backend_config=TurbomindEngineConfig( model_format="awq", - tp=2, + tp=2, # tp=4, - session_len=2048, # 4096, 8192, 16384, 32768 + session_len=16384, # 4096, 8192, 16384, 32768 max_batch_size=1, - cache_max_entry_count=0.1, # 0.05 - cache_block_seq_len=4096, # 8192, 16384, 32768 - quant_policy=4, + cache_max_entry_count=0.2, # 0.05 + cache_block_seq_len=16384, # 8192, 16384, 32768 + # quant_policy=8, # precision="fp16", ), # log_level='DEBUG' diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..f9c756a --- /dev/null +++ b/tasks.py @@ -0,0 +1,25 @@ +import asyncio +from celery_app import celery_app + +@celery_app.task +def text_query_task(question: str): + from endpoints.text import text_query # Import inside the function to avoid circular import + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop.run_until_complete(text_query(question=question)) # ✅ Correct way to call async functions in Celery + +@celery_app.task +def image_query_task(file_path: str, question: str): + from endpoints.image import image_query # Import inside the function + with open(file_path, "rb") as file: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop.run_until_complete(image_query(file=file, question=question)) # ✅ Use event loop + +@celery_app.task +def video_query_task(file_path: str, question: str): + from endpoints.video import video_query # Import inside the function + with open(file_path, "rb") as file: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop.run_until_complete(video_query(file=file, question=question)) # ✅ Use event loop diff --git a/ui.py b/ui.py index 42fd2b5..e0d536a 100644 --- a/ui.py +++ b/ui.py @@ -92,7 +92,6 @@ def setup_ui(): outputs=[video_output] ) - return ui if __name__ == "__main__":