From 83467b163d57f8588f2e46deac99ba05279741e2 Mon Sep 17 00:00:00 2001 From: Casey Tucker Date: Tue, 5 Mar 2024 00:29:57 -0800 Subject: [PATCH] narrow channel columns in bridge --- bridge/bridge.py | 167 ++++++++++++++++++++++++++--------------------- 1 file changed, 94 insertions(+), 73 deletions(-) diff --git a/bridge/bridge.py b/bridge/bridge.py index c317917..374efce 100755 --- a/bridge/bridge.py +++ b/bridge/bridge.py @@ -1,5 +1,6 @@ #!bridge/bin/python3 +import os import alsa_midi from bindings.capmix import capmix, EVENT import time @@ -22,23 +23,30 @@ 'Phatty', ' SM10 ', 'Circuit', - ' JUNO ', + ' JUNO ', ] control_map = { 1, 3, 5, 7, 9, 11, 13, 15, 257, 259, 261, 263, 265, 267, 269, 271, } +def log(*msg): + print(*msg) + def print_monitor_mutes(): - print("\0337\033[H\033[2K ", end='') + size = os.get_terminal_size() + print("\033[r",end='') + + #print("\0337",end='') + print("\033[H\033[2K ", end='') for ch in range(0, 16): s = '/' if stereo[ch+1] else ' ' - print("%3d %s" % (ch+1, s), end='') + print("%2d %s" % (ch+1, s), end='') print() print("\033[2K", end='') for ch in range(0, 16, 2): - st = "STEREO" if stereo[ch+1] else "" - print("\033[36m %10s\033[0m" % st, end='') + st = "LINK" if stereo[ch+1] else "" + print("\033[36m %6s\033[0m" % st, end='') print() for mon in monitors: print("\033[2K", end='') @@ -46,20 +54,22 @@ def print_monitor_mutes(): for ch in range(0, 16): value = mutes[ch+1][mon] if value == 0: - msg = "----" + msg = "---" color = "\033[1;33m" else: - msg = " M " - color = "\033[7;2;31m" - print("%s%5s\033[0m " % (color,msg), end='') + msg = "M " + color = "\033[4;7;2;31m" + print("%s%3s\033[0m " % (color,msg), end='') print() print("\033[2K") - print("\033[2K",end='') + print("\033[2K ",end='') for label in labels: - print(" %10s" % label, end='') + print(" %6s" % label, end='') print() print("\033[2K") - print("\0338", end='') + #print("\0338", end='') + + print("\033[10;%dr\033[%d;1f" % (size.lines, size.lines), end='') nrpn_msb = 0 nrpn_lsb = 0 @@ -71,7 +81,7 @@ def bcr_listener(event): return elif isinstance(event, SysExEvent): bcr_ok = True - print(repr(event)) + log(repr(event)) print_monitor_mutes() return if event.channel != 15: @@ -83,7 +93,7 @@ def bcr_listener(event): if event.param == 6: # data handle_nrpn(nrpn_msb, nrpn_lsb, event.value) - print(repr(event)) + log(repr(event)) print_monitor_mutes() def handle_nrpn(msb, lsb, value): @@ -98,24 +108,14 @@ def handle_nrpn(msb, lsb, value): addr = capmix.parse_addr(param) capmix.put(addr, value) -queue = [] -def main(): - # setup capmix - capmix.set_model(4) - # setup MIDI I/O - client = SequencerClient("midipi") - client.set_output_buffer_size(2048) - client.set_input_buffer_size(2048) - port = client.create_port("io") +queue = [] - def bcr_ping(client, port): - data = b'\xf0\x00\x20\x32\x7f\x15\x01\xf7' - event = SysExEvent(data) - client.event_output(event, port=port) - client.drain_output() +class BCR: + def connect(self, client, port): + self.client = client + self.port = port - def bcr_connect(client, port): client_ports = client.list_ports() # rtpmidi needed to access the BCR-2000 bcr_ports = list(filter(lambda x: x is not None, [ x if x.name == bcr_name else None for x in client_ports ])) @@ -124,14 +124,46 @@ def bcr_connect(client, port): port.connect_to(bcr_port) port.connect_from(bcr_port) else: - print("Unable to connect to BCR-2000") + log("Unable to connect to BCR-2000") return False - print("BCR connected") - bcr_ping(client, port) + log("BCR connected") + self.ping() return True - def listener(event): + def ping(self): + data = b'\xf0\x00\x20\x32\x7f\x15\x01\xf7' + event = SysExEvent(data) + self.client.event_output(event, port=self.port) + self.client.drain_output() + + def listen(self): + event = self.client.event_input(timeout=0.01) + if event: + bcr_listener(event) + + def send_nrpn(self, ch, msb, lsb, val): + event1 = ControlChangeEvent(channel=15, param=99, value=msb) + event2 = ControlChangeEvent(channel=15, param=98, value=lsb) + event3 = ControlChangeEvent(channel=15, param=6, value=val) + + self.client.event_output(event1, port=self.port) + self.client.event_output(event2, port=self.port) + self.client.event_output(event3, port=self.port) + + log(repr(event1), repr(event2), repr(event3)) + print_monitor_mutes() + self.client.drain_output() + + def sync(self): + global queue + for msg in queue: + self.send_nrpn(15, msg[0], msg[1], msg[2]) + queue = [] + +class Capture: + @classmethod + def listener(cls, event): global queue value = event.value() addr = capmix.format_addr(event.addr) @@ -147,47 +179,36 @@ def listener(event): ch = ((event.addr & 0x0f00) >> 8) + 1 stereo[ch] = value.unpacked.discrete - print("addr=%x=%s type=%s v=%s" % (event.addr, addr, event.type_name(), value)) + log("addr=%x=%s type=%s v=%s" % (event.addr, addr, event.type_name(), value)) print_monitor_mutes() - def bcr_listen(client): - event = client.event_input(timeout=0.01) - if event: - bcr_listener(event) - - def send_nrpn(client, port, ch, msb, lsb, val): - event1 = ControlChangeEvent(channel=15, param=99, value=msb) - event2 = ControlChangeEvent(channel=15, param=98, value=lsb) - event3 = ControlChangeEvent(channel=15, param=6, value=val) - - client.event_output(event1, port=port) - client.event_output(event2, port=port) - client.event_output(event3, port=port) - print(repr(event1), repr(event2), repr(event3)) - - client.drain_output() - - def bcr_sync(client): - global queue - for msg in queue: - send_nrpn(client, port, 15, msg[0], msg[1], msg[2]) - - queue = [] + def connect(self): + capmix_ok = capmix.connect(Capture.listener) + if not capmix_ok: + log("Unable to connect to STUDIO-CAPTURE") + return False + self.get_mixer_data() + return True - def get_mixer_data(): + def get_mixer_data(self): for ch in range(0,16,2): capmix.get(capmix.parse_addr("input_monitor.a.channel.{}.stereo".format(ch+1))) for ch in range(0,16): for mon in monitors: capmix.get(capmix.parse_addr("input_monitor.{}.channel.{}.mute".format(mon, ch+1))) - def capmix_connect(): - capmix_ok = capmix.connect(listener) - if not capmix_ok: - print("Unable to connect to STUDIO-CAPTURE") - return False - get_mixer_data() - return True +def main(): + capture = Capture() + bcr = BCR() + + # setup capmix + capmix.set_model(4) + + # setup MIDI I/O + client = SequencerClient("midipi") + client.set_output_buffer_size(2048) + client.set_input_buffer_size(2048) + port = client.create_port("io") def check_connections(client, port): client_ports = client.list_ports() @@ -199,11 +220,11 @@ def check_connections(client, port): if p.name == capmix_name: capmix_ok = True if not bcr_ok: - print("BCR disconnected") + log("BCR disconnected") if not capmix_ok: - print("STUDIO-CAPTURE disconnected") + log("STUDIO-CAPTURE disconnected") - bcr_ping(client, port) + bcr.ping() return (bcr_ok, capmix_ok) @@ -218,15 +239,15 @@ def check_connections(client, port): if not ( bcr_ok and capmix_ok ): if now - last_attempt > 5: if not bcr_ok: - bcr_ok = bcr_connect(client, port) + bcr_ok = bcr.connect(client, port) if not capmix_ok: - capmix_ok = capmix_connect() + capmix_ok = capture.connect() last_attempt = now if capmix_ok: x = capmix.listen() - bcr_sync(client) + bcr.sync() if bcr_ok: - bcr_listen(client) + bcr.listen() if capmix_ok or bcr_ok: if now - last_ok > 5: bcr_ok, capmix_ok = check_connections(client, port) @@ -235,7 +256,7 @@ def check_connections(client, port): except KeyboardInterrupt: pass capmix.disconnect() - print("Done.") + log("Done.") if __name__ == '__main__': main()