Tiktok-Talent-Info/utils/video_processing.py

96 lines
3.4 KiB
Python

import cv2
import os
import subprocess
import numpy as np
from PIL import Image
from pydub import AudioSegment
from decord import VideoReader, cpu
from concurrent.futures import ThreadPoolExecutor
def split_video_into_segments(video_path, segment_duration=30):
"""
Splits a video into segments of a specified duration using FFmpeg.
"""
output_dir = "/tmp/video_segments"
os.makedirs(output_dir, exist_ok=True)
# Calculate total duration of the video
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
total_duration = total_frames / fps
cap.release()
segments = []
for start_time in range(0, int(total_duration), segment_duration):
segment_file = os.path.join(output_dir, f"segment_{start_time}.mp4")
command = [
"ffmpeg", "-y",
"-i", video_path,
"-ss", str(start_time),
"-t", str(segment_duration),
"-c", "copy", segment_file
]
subprocess.run(command, check=True)
segments.append(segment_file)
print(f"segments: \n", segments)
return segments
def extract_motion_key_frames(video_path, max_frames=20, sigma_multiplier=2, frame_interval=1):
"""
Extracts key frames from a video based on motion intensity.
"""
def calculate_motion(frame_pair):
"""
Calculates motion between two consecutive frames using optical flow.
"""
prev_gray, current_frame = frame_pair
current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(prev_gray, current_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
motion = np.sum(flow ** 2)
return motion, current_gray
# Load video frames using Decord
video = VideoReader(video_path, ctx=cpu(0))
frames_batch = video.get_batch(range(0, len(video), frame_interval)).asnumpy()
# Resize frames for faster processing
frames = [cv2.resize(frame, (frame.shape[1] // 2, frame.shape[0] // 2)) for frame in frames_batch]
# Initialize the first frame
prev_gray = cv2.cvtColor(frames[0], cv2.COLOR_BGR2GRAY)
frame_pairs = [(prev_gray, frames[i]) for i in range(1, len(frames))]
# Calculate motion statistics
motion_values = []
with ThreadPoolExecutor() as executor:
motion_results = list(executor.map(calculate_motion, frame_pairs))
motion_values = [motion for motion, _ in motion_results]
# Calculate threshold statistically
motion_mean = np.mean(motion_values)
motion_std = np.std(motion_values)
threshold = motion_mean + sigma_multiplier * motion_std
# Extract key frames based on motion threshold
key_frames = []
for i, (motion, frame) in enumerate(zip(motion_values, frames[1:])):
if motion > threshold and len(key_frames) < max_frames:
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
key_frames.append(img)
return key_frames
def extract_audio_from_video(video_path):
"""
Extract audio from video using pydub and save as a temporary audio file.
"""
print("Audio extraction started...")
audio = AudioSegment.from_file(video_path)
print("Audio extraction completed.")
audio_path = "/tmp/temp_audio.wav"
audio.export(audio_path, format="wav")
print(f"Audio extracted and saved to: {audio_path}")
return audio_path