8000 Add run_async operator · yan-code1/ffmpeg-python@462e34b · GitHub
[go: up one dir, main page]

Skip to content

Commit 462e34b

Browse files
committed
Add run_async operator
1 parent 4276899 commit 462e34b

File tree

4 files changed

+164
-25
lines changed

4 files changed

+164
-25
lines changed

examples/README.md

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,44 +132,48 @@ out.run()
132132
- Encode output video with ffmpeg
133133

134134
```python
135-
args1 = (
135+
process1 = (
136136
ffmpeg
137137
.input(in_filename)
138138
.output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8)
139-
.compile()
139+
.run_async(pipe_stdout=True)
140140
)
141-
process1 = subprocess.Popen(args1, stdout=subprocess.PIPE)
142141

143-
args2 = (
142+
process2 = (
144143
ffmpeg
145144
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
146145
.output(out_filename, pix_fmt='yuv420p')
147146
.overwrite_output()
148-
.compile()
147+
.run_async(pipe_stdin=True()
149148
)
150-
process2 = subprocess.Popen(args2, stdin=subprocess.PIPE)
151149

152150
while True:
153151
in_bytes = process1.stdout.read(width * height * 3)
154-
in_frame (
152+
if not in_bytes:
153+
break
154+
in_frame = (
155155
np
156156
.frombuffer(in_bytes, np.uint8)
157157
.reshape([height, width, 3])
158158
)
159159

160160
# See examples/tensorflow_stream.py:
161-
frame = deep_dream.process_frame(frame)
161+
out_frame = deep_dream.process_frame(in_frame)
162162

163163
process2.stdin.write(
164-
frame
164+
in_frame
165165
.astype(np.uint8)
166166
.tobytes()
167167
)
168+
169+
process2.stdin.close()
170+
process1.wait()
171+
process2.wait()
168172
```
169173

170174
<img src="https://raw.githubusercontent.com/kkroening/ffmpeg-python/master/examples/graphs/dream.png" alt="deep dream streaming" width="40%" />
171175

172-
## [FaceTime webcam input](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py)
176+
## [FaceTime webcam input (OS X)](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py)
173177

174178
```python
175179
(
@@ -179,3 +183,25 @@ while True:
179183
.run()
180184
)
181185
```
186+
187+
## Stream from RTSP server to TCP socket
188+
189+
```python
190+
packet_size = 4096
191+
192+
process = (
193+
ffmpeg
194+
.input('rtsp://%s:8554/default')
195+
.output('-', format='h264')
196+
.run_async(pipe_stdout=True)
197+
)
198+
199+
while process.poll() is None:
200+
packet = process.stdout.read(packet_size)
201+
try:
202+
tcp_socket.send(packet)
203+
except socket.error:
204+
process.stdout.close()
205+
process.wait()
206+
break
207+
```

examples/tensorflow_stream.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def start_ffmpeg_process1(in_filename):
5858
args = (
5959
ffmpeg
6060
.input(in_filename)
61-
.output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8)
61+
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
6262
.compile()
6363
)
6464
return subprocess.Popen(args, stdout=subprocess.PIPE)
@@ -113,14 +113,14 @@ def run(in_filename, out_filename, process_frame):
113113
process1 = start_ffmpeg_process1(in_filename)
114114
process2 = start_ffmpeg_process2(out_filename, width, height)
115115
while True:
116-
frame = read_frame(process1, width, height)
117-
if frame is None:
116+
in_frame = read_frame(process1, width, height)
117+
if in_frame is None:
118118
logger.info('End of input stream')
119119
break
120120

121121
logger.debug('Processing frame')
122-
frame = process_frame(frame)
123-
write_frame(process2, frame)
122+
out_frame = process_frame(in_frame)
123+
write_frame(process2, out_frame)
124124

125125
logger.info('Waiting for ffmpeg process1')
126126
process1.wait()

ffmpeg/_run.py

Lines changed: 102 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,100 @@ def compile(stream_spec, cmd='ffmpeg', overwrite_output=False):
183183
return cmd + get_args(stream_spec, overwrite_output=overwrite_output)
184184

185185

186+
@output_operator()
187+
def run_async(
188+
stream_spec, cmd='ffmpeg', pipe_stdin=False, pipe_stdout=False, pipe_stderr=False,
189+
quiet=False, overwrite_output=False):
190+
"""Asynchronously invoke ffmpeg for the supplied node graph.
191+
192+
Args:
193+
pipe_stdin: if True, connect pipe to subprocess stdin (to be
194+
used with ``pipe:`` ffmpeg inputs).
195+
pipe_stdout: if True, connect pipe to subprocess stdout (to be
196+
used with ``pipe:`` ffmpeg outputs).
197+
pipe_stderr: if True, connect pipe to subprocess stderr.
198+
quiet: shorthand for setting ``capture_stdout`` and
199+
``capture_stderr``.
200+
**kwargs: keyword-arguments passed to ``get_args()`` (e.g.
201+
``overwrite 10000 _output=True``).
202+
203+
Returns:
204+
A `subprocess Popen`_ object representing the child process.
205+
206+
Examples:
207+
Run and stream input::
208+
209+
process = (
210+
ffmpeg
211+
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
212+
.output(out_filename, pix_fmt='yuv420p')
213+
.overwrite_output()
214+
.run_async(pipe_stdin=True)
215+
)
216+
process.communicate(input=input_data)
217+
218+
Run and capture output::
219+
220+
process = (
221+
ffmpeg
222+
.input(in_filename)
223+
.output('pipe':, format='rawvideo', pix_fmt='rgb24')
224+
.run_async(pipe_stdout=True, pipe_stderr=True)
225+
)
226+
out, err = process.communicate()
227+
228+
Process video frame-by-frame using numpy::
229+
230+
process1 = (
231+
ffmpeg
232+
.input(in_filename)
233+
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
234+
.run_async(pipe_stdout=True)
235+
)
236+
237+
process2 = (
238+
ffmpeg
239+
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
240+
.output(out_filename, pix_fmt='yuv420p')
241+
.overwrite_output()
242+
.run_async(pipe_stdin=True)
243+
)
244+
245+
while True:
246+
in_bytes = process1.stdout.read(width * height * 3)
247+
if not in_bytes:
248+
break
249+
in_frame = (
250+
np
251+
.frombuffer(in_bytes, np.uint8)
252+
.reshape([height, width, 3])
253+
)
254+
out_frame = in_frame * 0.3
255+
process2.stdin.write(
256+
frame
257+
.astype(np.uint8)
258+
.tobytes()
259+
)
260+
261+
process2.stdin.close()
262+
process1.wait()
263+
process2.wait()
264+
265+
.. _subprocess Popen: https://docs.python.org/3/library/subprocess.html#popen-objects
266+
"""
267+
args = compile(stream_spec, cmd, overwrite_output=overwrite_output)
268+
stdin_stream = subprocess.PIPE if pipe_stdin else None
269+
stdout_stream = subprocess.PIPE if pipe_stdout or quiet else None
270+
stderr_stream = subprocess.PIPE if pipe_stderr or quiet else None
271+
return subprocess.Popen(
272+
args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream)
273+
274+
186275
@output_operator()
187276
def run(
188277
stream_spec, cmd='ffmpeg', capture_stdout=False, capture_stderr=False, input=None,
189278
quiet=False, overwrite_output=False):
190-
"""Ivoke ffmpeg for the supplied node graph.
279+
"""Invoke ffmpeg for the supplied node graph.
191280
192281
Args:
193282
capture_stdout: if True, capture stdout (to be used with
@@ -201,13 +290,17 @@ def run(
201290
202291
Returns: (out, err) tuple containing captured stdout and stderr data.
203292
"""
204-
args = compile(stream_spec, cmd, overwrite_output=overwrite_output)
205-
stdin_stream = subprocess.PIPE if input else None
206-
stdout_stream = subprocess.PIPE if capture_stdout or quiet else None
207-
stderr_stream = subprocess.PIPE if capture_stderr or quiet else None
208-
p = subprocess.Popen(args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream)
209-
out, err = p.communicate(input)
210-
retcode = p.poll()
293+
process = run_async(
294+
stream_spec,
295+
cmd,
296+
pipe_stdin=input is not None,
297+
pipe_stdout=capture_stdout,
298+
pipe_stderr=capture_stderr,
299+
quiet=quiet,
300+
overwrite_output=overwrite_output,
301+
)
302+
out, err = process.communicate(input)
303+
retcode = process.poll()
211304
if retcode:
212305
raise Error('ffmpeg', out, err)
213306
return out, err
@@ -218,4 +311,5 @@ def run(
218311
'Error',
219312
'get_args',
220313
'run',
314+
'run_async',
221315
]

ffmpeg/tests/test_ffmpeg.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from __future__ import unicode_literals
2-
3-
from builtins import str
42
from builtins import bytes
53
from builtins import range
4+
from builtins import str
65
import ffmpeg
6+
import mock
77
import os
88
import pytest
99
import random
@@ -414,6 +414,25 @@ def test__compile():
414414
assert out_file.compile(cmd='ffmpeg.old') == ['ffmpeg.old', '-i', 'dummy.mp4', 'dummy2.mp4']
415415

416416

417+
@pytest.mark.parametrize('pipe_stdin', [True, False])
418+
@pytest.mark.parametrize('pipe_stdout', [True, False])
419+
@pytest.mark.parametrize('pipe_stderr', [True, False])
420+
def test__run_async(mocker, pipe_stdin, pipe_stdout, pipe_stderr):
421+
process__mock = mock.Mock()
422+
popen__mock = mocker.patch.object(subprocess, 'Popen', return_value=process__mock)
423+
stream = _get_simple_example()
424+
process = ffmpeg.run_async(
425+
stream, pipe_stdin=pipe_stdin, pipe_stdout=pipe_stdout, pipe_stderr=pipe_stderr)
426+
assert process is process__mock
427+
428+
expected_stdin = subprocess.PIPE if pipe_stdin else None
429+
expected_stdout = subprocess.PIPE if pipe_stdout else None
430+
expected_stderr = subprocess.PIPE if pipe_stderr else None
431+
(args,), kwargs = popen__mock.call_args
432+
assert args == ffmpeg.compile(stream)
433+
assert kwargs == dict(stdin=expected_stdin, stdout=expected_stdout, stderr=expected_stderr)
434+
435+
417436
def test__run():
418437
stream = _get_complex_filter_example()
419438
out, err = ffmpeg.run(stream)

0 commit comments

Comments
 (0)
0