Tiktok-Talent-Info/utils/video_processing.py

94 lines
3.3 KiB
Python
Raw Normal View History

2025-01-23 21:50:55 +08:00
import cv2
import os
import subprocess
import numpy as np
from PIL import Image
from pydub import AudioSegment
from decord import VideoReader, cpu
from concurrent.futures import ThreadPoolExecutor
def split_video_into_segments(video_path, segment_duration=30):
"""
Splits a video into segments of a specified duration using FFmpeg.
"""
output_dir = "/tmp/video_segments"
os.makedirs(output_dir, exist_ok=True)
# Calculate total duration of the video
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
total_duration = total_frames / fps
cap.release()
segments = []
for start_time in range(0, int(total_duration), segment_duration):
segment_file = os.path.join(output_dir, f"segment_{start_time}.mp4")
command = [
"ffmpeg", "-i", video_path,
"-ss", str(start_time),
"-t", str(segment_duration),
"-c", "copy", segment_file
]
subprocess.run(command, check=True)
segments.append(segment_file)
return segments
def extract_motion_key_frames(video_path, max_frames=20, sigma_multiplier=2, frame_interval=1):
"""
Extracts key frames from a video based on motion intensity.
"""
def calculate_motion(frame_pair):
"""
Calculates motion between two consecutive frames using optical flow.
"""
prev_gray, current_frame = frame_pair
current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(prev_gray, current_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
motion = np.sum(flow ** 2)
return motion, current_gray
# Load video frames using Decord
video = VideoReader(video_path, ctx=cpu(0))
frames_batch = video.get_batch(range(0, len(video), frame_interval)).asnumpy()
# Resize frames for faster processing
frames = [cv2.resize(frame, (frame.shape[1] // 2, frame.shape[0] // 2)) for frame in frames_batch]
# Initialize the first frame
prev_gray = cv2.cvtColor(frames[0], cv2.COLOR_BGR2GRAY)
frame_pairs = [(prev_gray, frames[i]) for i in range(1, len(frames))]
# Calculate motion statistics
motion_values = []
with ThreadPoolExecutor() as executor:
motion_results = list(executor.map(calculate_motion, frame_pairs))
motion_values = [motion for motion, _ in motion_results]
# Calculate threshold statistically
motion_mean = np.mean(motion_values)
motion_std = np.std(motion_values)
threshold = motion_mean + sigma_multiplier * motion_std
# Extract key frames based on motion threshold
key_frames = []
for i, (motion, frame) in enumerate(zip(motion_values, frames[1:])):
if motion > threshold and len(key_frames) < max_frames:
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
key_frames.append(img)
return key_frames
def extract_audio_from_video(video_path):
"""
Extract audio from video using pydub and save as a temporary audio file.
"""
print("Audio extraction started...")
audio = AudioSegment.from_file(video_path)
print("Audio extraction completed.")
audio_path = "/tmp/temp_audio.wav"
audio.export(audio_path, format="wav")
print(f"Audio extracted and saved to: {audio_path}")
return audio_path