-
Notifications
You must be signed in to change notification settings - Fork 0
/
tempo_edit.py
43 lines (40 loc) · 1.25 KB
/
tempo_edit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
"""Edit, switching from multiple input streams on the beat"""
from typing import Dict
from logger import Logger
from stream_input import InputStream, InputQueue
from stream_output import OutputStream
from stream_chaser import StreamChaser
from audio_peak import AudioPeak
config: Dict = {
"audio_track": "raw-video/side2-track2.wav",
"audio_debounce": 2,
"audio_peak": 0.6,
"input_pattern": "raw-video/edit-test*",
"input_skip": 50,
"output_template": "video/edit.mp4",
"output_seconds": 5 * 60,
}
logger = Logger()
input_queue = InputQueue(config["input_pattern"])
streams = list(
map(
(lambda _: InputStream(input_queue, logger.input_finished)),
range(input_queue.number_of_streams),
)
)
stream_chaser = StreamChaser(streams)
attributes = stream_chaser.first_stream().attributes
stream_chaser.skip_frames(config["input_skip"])
audio_peak = AudioPeak(
config["audio_track"],
attributes["frame_rate"],
config["audio_debounce"],
config["audio_peak"],
)
output_stream = OutputStream(config, attributes, logger.output_finished)
while audio_peak.read():
raw_bytes = stream_chaser.stream(audio_peak.peak()).read()
if len(raw_bytes) == 0:
break
output_stream.write(raw_bytes)
output_stream.close()