-
Notifications
You must be signed in to change notification settings - Fork 923
Description
Friends, I hope to load the mp4 file, separate the video stream and the audio stream, perform stt/tts processing on the audio stream, and then merge and push the rtmp server. But the actual situation is that there are multiple pipes: The channel cannot play anything. output_process.stdin.write(video_frame) alone
Or output_process.stdin.write(audio_data) is normal. Thank you for your help
`import numpy as np
import ffmpeg
import torch
out_url = 'rtmp://localhost:1935/live/stream'
def main():
try:
input_stream = ffmpeg.input(
'./data/test.mp4', re=None)
video_input = (
input_stream.video
.output('pipe:', format='rawvideo', pix_fmt='yuv420p', s='854x480', r=30, vsync='cfr')
.run_async(pipe_stdout=True)
)
audio_input = (
input_stream.audio
.output('pipe:', format='s16le', acodec='pcm_s16le', ac=1, ar=16000)
.run_async(pipe_stdout=True)
)
output_process = (
ffmpeg
.output(
ffmpeg.input('pipe:', format='rawvideo',pix_fmt='yuv420p', s='854x480'),
# ffmpeg.input('pipe:', format='s16le', ac=1, ar=16000),
'rtmp://localhost:1935/live/stream',
vcodec='libx264',
tune='zerolatency',
acodec='aac',
preset='ultrafast',
f='flv',
)
.overwrite_output()
.run_async(pipe_stdin=True)
)
chunk_size_ms = 4000
buffer_size = int(16000 * 2 * chunk_size_ms / 1000)
frame_size = 854 * 480 * 3 // 2
while True:
try:
video_frame = video_input.stdout.read(frame_size)
# audio_data = audio_input.stdout.read(buffer_size)
except Exception as e:
print(f"读取数据时出错: {e}")
break
if video_frame:
try:
output_process.stdin.write(video_frame)
except BrokenPipeError as e:
print(f"视频写入失败: {e}")
break
# if audio_data:
# try:
# output_process.stdin.write(audio_data)
# except BrokenPipeError as e:
# print(f"音频写入失败: {e}")
# break
# 终止条件检查
# if not video_frame and not audio_data:
# break
except Exception as e:
print(f"发生异常: {e}")
finally:
# 确保所有进程正确关闭
if 'audio_input' in locals():
audio_input.terminate()
audio_input.wait()
if 'video_input' in locals():
video_input.terminate()
video_input.wait()
if 'output_process' in locals():
try:
output_process.stdin.close()
except Exception as e:
print(f"关闭输出进程输入管道时出错: {e}")
output_process.terminate()
output_process.wait()
if name == 'main':
main()`