Tiktok-Talent-Info/celery_debug.py
2025-05-12 11:22:46 +08:00

213 lines
8.2 KiB
Python

import os
import time
import random
from tasks import app
print(app.control.ping())
print(app.conf.result_backend)
print(app.conf.broker_url)
# task = celery.send_task("tasks.text_query_task", args=["What is string?"])
# from celery.result import AsyncResult
# result = AsyncResult(task.id)
# print(result.get(timeout=1000)) # Should return "Celery is working!"
# print(celery.tasks)
# print(result.state) # Possible states: PENDING, STARTED, SUCCESS, FAILURE
# print(result.get()) # Get result if completed
# # Check status
# result = AsyncResult(task.id)
# print(result.status)
# print(result.result) # If it failed, this will contain an error message
from celery import chain
from tasks import text_query_task, image_query_task, preprocess_video, inference_video
from tasks import text_query_task, image_query_task, video_query_task
from celery.result import AsyncResult
from concurrent.futures import ThreadPoolExecutor
import time
# Check Celery connectivity
print("Celery Ping Response:", app.control.ping())
# Print Celery configuration details
print("Backend:", app.conf.result_backend)
print("Broker:", app.conf.broker_url)
# Define the number of concurrent tasks
NUM_TASKS = 1
delay_seconds = 0
file_paths = [f"../video/3.mp4" for _ in range(NUM_TASKS)]
# video_folder = "../video"
# video_files = [f for f in os.listdir(video_folder) if f.endswith(('.mp4', '.avi', '.mov', '.mkv'))]
# video_files = ['1.2.mp4', '1.mp4', '3.mp4', '4.mp4', '5.mp4']
# print(video_files)
# file_paths = [os.path.join(video_folder, random.choice(video_files)) for _ in range(NUM_TASKS)]
# print(file_paths)
# profile_folder = "../profile"
# image_files = [f for f in os.listdir(profile_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
# file_paths = [os.path.join(profile_folder, random.choice(image_files)) for _ in range(NUM_TASKS)]
# questions = [f"What is AI? {i}" for i in range(NUM_TASKS)]
# questions = [
# f"Extract the following information from this image and return the result in JSON format:\n"
# f"- Name: <name>\n"
# f"- ID: <id>\n"
# f"- Profile Picture: <url>\n"
# f"- Follower Count: <count>\n"
# f"- Likes Count: <count>\n"
# f"- Bio: <bio>\n"
# f"- Following Count: <count>\n"
# f"- External Links: <links>\n"
# f"Do not include any disclaimers or comments like 'I'm sorry' or 'I can't assist.' Task {i}"
# for i in range(NUM_TASKS)
# ]
questions = [
f"Based on the given images and audio script, extract detailed information about the products recommended in the video and format the output as JSON with the following fields:\n"
f"- Product Name: <name>\n"
f"- Category: <category>\n"
f"- Styles or Variants: <styles/variants>\n"
f"- Highlights: <highlights>\n"
f"- Promotional Details: <promotional_details>\n"
f"Do not include any disclaimers or comments like 'I'm sorry' or 'I can't assist.' Task {i}"
for i in range(NUM_TASKS)
]
# questions = [
# f"Generate a screenplay based on the given video content and format the output as JSON with the following structured fields:\n"
# f"- Scene Descriptions: <visual setting including background, lighting, atmosphere>\n"
# f"- Character Introductions: <key characters with appearance and notable traits>\n"
# f"- Dialogue: <transcribed spoken lines in screenplay format>\n"
# f"- Actions & Expressions: <non-verbal cues and interactions>\n"
# f"- Product Integrations: <product names, categories, features if applicable>\n"
# f"- Narrative Flow: <scene transitions and pacing notes>\n"
# f"Follow standard screenplay formatting for headers, character names, dialogue, and actions. Do not include disclaimers or comments like 'I can't assist.' Task {i}"
# for i in range(NUM_TASKS)
# ]
# def submit_task(question, index): # sends tasks to Celery asynchronously, queues the tasks in Celery broker. If multiple Celery workers, they process tasks in parallel.
# """ Submits a Celery task with increasing delay """
# countdown_time = index * delay_seconds # Dynamic delay
# task = text_query_task.apply_async(args=[question], countdown=countdown_time)
# print("Running celery_debug...")
# # task = text_query_task.delay(question)
# print(f"Task {index} scheduled with {countdown_time}s delay.")
# return task.id
# def submit_task(file_path, question, index):
# """ Submits a Celery task with increasing delay """
# countdown_time = index * delay_seconds # Dynamic delay
# task = image_query_task.apply_async(args=[file_path, question], countdown=countdown_time)
# print(f"Task {index} scheduled with {countdown_time}s delay.")
# return task.id
# def submit_task(file_path, question, index):
# """ Submits a video query task with increasing delay """
# countdown_time = index * delay_seconds # Dynamic delay
# task = video_query_task.apply_async(args=[file_path, question], countdown=countdown_time)
# print(f"Task {index} scheduled with {countdown_time}s delay for file {file_path}.")
# return task.id
# def submit_task(file_path, question, index):
# # countdown_time = index * delay_seconds
# countdown_time = delay_seconds
# # Chain preprocessing and inference tasks, inference only after preprocess done
# task_chain = chain(
# preprocess_video.s(file_path, question),
# inference_video.s()
# ).apply_async(countdown=countdown_time)
# print(f"Task {index} scheduled with {countdown_time}s delay for file {file_path}.")
# return task_chain.id
from celery import chord, group
from tasks import preprocess_video, inference_video
def submit_task(file_paths, questions, batch_size=4):
task_ids = []
for i in range(0, len(file_paths), batch_size):
# Get the current batch of file paths and questions
batch_file_paths = file_paths[i:i + batch_size]
batch_questions = questions[i:i + batch_size]
print(f"batch file paths: {batch_file_paths}")
# Create preprocessing tasks for the current batch
preprocessing_tasks = [
preprocess_video.s(file_path, question)
for file_path, question in zip(batch_file_paths, batch_questions)
]
# Submit the batch as a chord
chord_task = chord(preprocessing_tasks)(inference_video.s())
task_ids.append(chord_task.id)
print(f"Batch {i // batch_size + 1} submitted with task ID: {chord_task.id}")
return task_ids
# def submit_task(file_path, question, index):
# preprocess_task = preprocess_video.apply_async(
# args=[file_path, question],
# queue="preprocess_queue"
# )
# print(f"Task {index} preprocessing scheduled for file {file_path}.")
# # Add a callback to submit inference task after preprocessing is done
# preprocess_task.then(
# inference_video.s().set(queue="inference_queue")
# )
# print(f"Task {index} inference will be scheduled after preprocessing.")
# return preprocess_task.id
start_time = time.time()
print(f"\nSubmitting {NUM_TASKS} tasks concurrently...")
task_ids = []
# from tasks import add
# result = add.delay(questions)
# print(result)
# print(f"Task ID: {result.id}")
# try:
# task_result = result.get(timeout=5)
# print(f"Task Result: {task_result}")
# except Exception as e:
# print(f"Task not ready or failed: {e}")
# task_ids.append(result.id)
# with ThreadPoolExecutor(max_workers=10) as executor:
# # resultID = executor.map(submit_task, questions, range(NUM_TASKS))
# resultID = executor.map(submit_task, file_paths, questions, range(NUM_TASKS))
# task_ids.extend(resultID)
task_ids = submit_task(file_paths, questions)
print("\nAll tasks submitted!")
print(task_ids)
# Monitor Task Statuses
print("\nChecking Task Results...")
for task_id in task_ids:
async_result = AsyncResult(task_id, app=app)
while async_result.status not in ["SUCCESS", "FAILURE"]:
print(f"Task {task_id} status: {async_result.status}")
time.sleep(1)
# Fetch final result
print(f"Task {task_id} final status: {async_result.status}")
if async_result.status == "SUCCESS":
print(f"Result: {async_result.get()}")
print("\nAll tasks completed.")
end_time = time.time()
print(f"Total time taken: {end_time - start_time:.2f} seconds.")