2025-03-22 20:54:10 +08:00
import os
import time
import random
from tasks import app
print ( app . control . ping ( ) )
print ( app . conf . result_backend )
print ( app . conf . broker_url )
# task = celery.send_task("tasks.text_query_task", args=["What is string?"])
# from celery.result import AsyncResult
# result = AsyncResult(task.id)
# print(result.get(timeout=1000)) # Should return "Celery is working!"
# print(celery.tasks)
# print(result.state) # Possible states: PENDING, STARTED, SUCCESS, FAILURE
# print(result.get()) # Get result if completed
# # Check status
# result = AsyncResult(task.id)
# print(result.status)
# print(result.result) # If it failed, this will contain an error message
from celery import chain
from tasks import text_query_task , image_query_task , preprocess_video , inference_video
from tasks import text_query_task , image_query_task , video_query_task
from celery . result import AsyncResult
from concurrent . futures import ThreadPoolExecutor
import time
# Check Celery connectivity
print ( " Celery Ping Response: " , app . control . ping ( ) )
# Print Celery configuration details
print ( " Backend: " , app . conf . result_backend )
print ( " Broker: " , app . conf . broker_url )
# Define the number of concurrent tasks
2025-03-31 13:21:15 +08:00
NUM_TASKS = 1
2025-03-22 20:54:10 +08:00
delay_seconds = 0
2025-03-31 13:21:15 +08:00
file_paths = [ f " ../video/film4.mp4 " for _ in range ( NUM_TASKS ) ]
2025-03-22 20:54:10 +08:00
# video_folder = "../video"
# video_files = [f for f in os.listdir(video_folder) if f.endswith(('.mp4', '.avi', '.mov', '.mkv'))]
# video_files = ['1.2.mp4', '1.mp4', '3.mp4', '4.mp4', '5.mp4']
# print(video_files)
# file_paths = [os.path.join(video_folder, random.choice(video_files)) for _ in range(NUM_TASKS)]
# print(file_paths)
# profile_folder = "../profile"
# image_files = [f for f in os.listdir(profile_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
# file_paths = [os.path.join(profile_folder, random.choice(image_files)) for _ in range(NUM_TASKS)]
# questions = [f"What is AI? {i}" for i in range(NUM_TASKS)]
# questions = [
# f"Extract the following information from this image and return the result in JSON format:\n"
# f"- Name: <name>\n"
# f"- ID: <id>\n"
# f"- Profile Picture: <url>\n"
# f"- Follower Count: <count>\n"
# f"- Likes Count: <count>\n"
# f"- Bio: <bio>\n"
# f"- Following Count: <count>\n"
# f"- External Links: <links>\n"
# f"Do not include any disclaimers or comments like 'I'm sorry' or 'I can't assist.' Task {i}"
# for i in range(NUM_TASKS)
# ]
2025-03-31 13:21:15 +08:00
# questions = [
# f"Based on the given images and audio script, extract detailed information about the products recommended in the video and format the output as JSON with the following fields:\n"
# f"- Product Name: <name>\n"
# f"- Category: <category>\n"
# f"- Styles or Variants: <styles/variants>\n"
# f"- Highlights: <highlights>\n"
# f"- Promotional Details: <promotional_details>\n"
# f"Do not include any disclaimers or comments like 'I'm sorry' or 'I can't assist.' Task {i}"
# for i in range(NUM_TASKS)
# ]
2025-03-22 20:54:10 +08:00
questions = [
2025-03-31 13:21:15 +08:00
f " Generate a screenplay based on the given video content and format the output as JSON with the following structured fields: \n "
f " - Scene Descriptions: <visual setting including background, lighting, atmosphere> \n "
f " - Character Introductions: <key characters with appearance and notable traits> \n "
f " - Dialogue: <transcribed spoken lines in screenplay format> \n "
f " - Actions & Expressions: <non-verbal cues and interactions> \n "
f " - Product Integrations: <product names, categories, features if applicable> \n "
f " - Narrative Flow: <scene transitions and pacing notes> \n "
f " Follow standard screenplay formatting for headers, character names, dialogue, and actions. Do not include disclaimers or comments like ' I can ' t assist. ' Task { i } "
2025-03-22 20:54:10 +08:00
for i in range ( NUM_TASKS )
]
# def submit_task(question, index): # sends tasks to Celery asynchronously, queues the tasks in Celery broker. If multiple Celery workers, they process tasks in parallel.
# """ Submits a Celery task with increasing delay """
# countdown_time = index * delay_seconds # Dynamic delay
# task = text_query_task.apply_async(args=[question], countdown=countdown_time)
# print("Running celery_debug...")
# # task = text_query_task.delay(question)
# print(f"Task {index} scheduled with {countdown_time}s delay.")
# return task.id
# def submit_task(file_path, question, index):
# """ Submits a Celery task with increasing delay """
# countdown_time = index * delay_seconds # Dynamic delay
# task = image_query_task.apply_async(args=[file_path, question], countdown=countdown_time)
# print(f"Task {index} scheduled with {countdown_time}s delay.")
# return task.id
# def submit_task(file_path, question, index):
# """ Submits a video query task with increasing delay """
# countdown_time = index * delay_seconds # Dynamic delay
# task = video_query_task.apply_async(args=[file_path, question], countdown=countdown_time)
# print(f"Task {index} scheduled with {countdown_time}s delay for file {file_path}.")
# return task.id
# def submit_task(file_path, question, index):
# # countdown_time = index * delay_seconds
# countdown_time = delay_seconds
# # Chain preprocessing and inference tasks, inference only after preprocess done
# task_chain = chain(
# preprocess_video.s(file_path, question),
# inference_video.s()
# ).apply_async(countdown=countdown_time)
# print(f"Task {index} scheduled with {countdown_time}s delay for file {file_path}.")
# return task_chain.id
from celery import chord , group
from tasks import preprocess_video , inference_video
def submit_task ( file_paths , questions , batch_size = 4 ) :
task_ids = [ ]
for i in range ( 0 , len ( file_paths ) , batch_size ) :
# Get the current batch of file paths and questions
batch_file_paths = file_paths [ i : i + batch_size ]
batch_questions = questions [ i : i + batch_size ]
2025-03-31 13:21:15 +08:00
print ( f " batch file paths: { batch_file_paths } " )
2025-03-22 20:54:10 +08:00
# Create preprocessing tasks for the current batch
preprocessing_tasks = [
preprocess_video . s ( file_path , question )
for file_path , question in zip ( batch_file_paths , batch_questions )
]
# Submit the batch as a chord
chord_task = chord ( preprocessing_tasks ) ( inference_video . s ( ) )
task_ids . append ( chord_task . id )
print ( f " Batch { i / / batch_size + 1 } submitted with task ID: { chord_task . id } " )
return task_ids
# def submit_task(file_path, question, index):
# preprocess_task = preprocess_video.apply_async(
# args=[file_path, question],
# queue="preprocess_queue"
# )
# print(f"Task {index} preprocessing scheduled for file {file_path}.")
# # Add a callback to submit inference task after preprocessing is done
# preprocess_task.then(
# inference_video.s().set(queue="inference_queue")
# )
# print(f"Task {index} inference will be scheduled after preprocessing.")
# return preprocess_task.id
start_time = time . time ( )
print ( f " \n Submitting { NUM_TASKS } tasks concurrently... " )
task_ids = [ ]
# from tasks import add
# result = add.delay(questions)
# print(result)
# print(f"Task ID: {result.id}")
# try:
# task_result = result.get(timeout=5)
# print(f"Task Result: {task_result}")
# except Exception as e:
# print(f"Task not ready or failed: {e}")
# task_ids.append(result.id)
# with ThreadPoolExecutor(max_workers=10) as executor:
# # resultID = executor.map(submit_task, questions, range(NUM_TASKS))
# resultID = executor.map(submit_task, file_paths, questions, range(NUM_TASKS))
# task_ids.extend(resultID)
task_ids = submit_task ( file_paths , questions )
print ( " \n All tasks submitted! " )
print ( task_ids )
# Monitor Task Statuses
print ( " \n Checking Task Results... " )
for task_id in task_ids :
async_result = AsyncResult ( task_id , app = app )
while async_result . status not in [ " SUCCESS " , " FAILURE " ] :
print ( f " Task { task_id } status: { async_result . status } " )
time . sleep ( 1 )
# Fetch final result
print ( f " Task { task_id } final status: { async_result . status } " )
if async_result . status == " SUCCESS " :
print ( f " Result: { async_result . get ( ) } " )
print ( " \n All tasks completed. " )
end_time = time . time ( )
print ( f " Total time taken: { end_time - start_time : .2f } seconds. " )