-
Notifications
You must be signed in to change notification settings - Fork 3
/
bambu_obs_overlay.py
148 lines (117 loc) · 5.32 KB
/
bambu_obs_overlay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from datetime import datetime, timedelta
from pathlib import Path
import paho.mqtt.client as mqtt
import json
import ssl
import webcolors
# Found on printer touch screen under Settings (hex icon) -> Network
BAMBU_IP_ADDRESS = '192.168.x.x'
ACCESS_CODE = ''
# Found on printer touch screen under Settings (hex icon) -> General
SERIAL = ''
# What ever directory you want the output files written to
OUTPUT_PATH = ''
values = {}
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
# Bambu requires you to subscribe promptly after connecting or it forces a discconnect
client.subscribe(f"device/{SERIAL}/report")
def split_string(string):
tuple_result = (int(string[:2],16), int(string[2:4],16), int(string[4:6],16))
return tuple_result
# Expected input: 6 character hex string
def rgb_to_color_name(rgb):
try:
color_tuple = split_string(rgb)
color_name = webcolors.rgb_to_name(color_tuple)
except ValueError:
# If the RGB value doesn't match any known color, return the hex code with a 0x prefeix
color_name = f"0x{rgb}"
return color_name
# The callback for when a PUBLISH message is received from the server.
# I'm really only using msg parameter here, but need to keep the first 2 args to match
# the callback signature
def on_message(client, userdata, msg):
# Current date and time
now = datetime.now()
doc = json.loads(msg.payload)
# Have something to look at on screen so you know it's spinning
print(doc)
output_dir = Path(OUTPUT_PATH)
# JSON blobs written just for debug convenience
telem_json_path = output_dir / "telemetry.json"
telem_json_err_path = output_dir / "telemetry.err.txt"
# only reason I split these into separate files was so they could
# easily be modeled as separate text boxes in OBS
telem_text_path = output_dir / "telemetry.txt"
ams_text_path = output_dir / "ams.txt"
try:
# Write the raw JSON first incase there's a key error when we start trying to access it
with telem_json_path.open("w") as fp:
fp.write(json.dumps(doc))
if not doc:
return
globals()['values'] = dict(values, **doc['print'])
print(values)
layer = values.get('layer_num', '?')
speed = values.get('spd_lvl', 2)
speed_map = {1: 'Silent', 2: 'Standard', 3: 'Sport', 4: 'Ludacris'}
min_remain = values['mc_remaining_time']
# Time 15 minutes in the future
future_time = now + timedelta(minutes=min_remain)
future_time_str = future_time.strftime("%Y-%m-%d %H:%M")
active_ams = values['ams']['tray_now']
with telem_text_path.open("w") as fp:
fp.write(f"Layer: {layer} ({values['mc_percent']} %)\n"
f"Nozzle Temp: {values['nozzle_temper']}/{values['nozzle_target_temper']}\n"
f"Bed Temp: {values['bed_temper']}/{values['bed_target_temper']}\n"
f"Finish ETA: {future_time_str}\n"
f"Speed: {speed_map[speed]}")
with ams_text_path.open("w") as fp:
for tray in values['ams']['ams'][0]['tray']:
tray_remain = ''
color_name = rgb_to_color_name(tray['cols'][0])
if active_ams == tray['id']:
active = " - In Use"
else:
active = ""
if tray['remain'] != -1:
tray_remain = f"({tray['remain']}% remain)"
if tray['tray_sub_brands'] == '':
tray_type = tray['tray_type']
else:
tray_type = tray['tray_sub_brands']
fp.write(f"Tray {tray['id']}: {tray_type} {color_name} {tray_remain} {active}\n")
if active_ams == "254":
active = " - In Use"
else:
active = ""
color_name = rgb_to_color_name(values['vt_tray']['cols'][0])
fp.write(f"Ext. : {values['vt_tray']['tray_type']} {color_name} {active}")
# Sometimes empty or diff doc structure returned
# Swallow exception here and it'll just get retried next iteration
except KeyError:
print("Logging error json")
with telem_json_err_path.open("w") as fp:
fp.write(json.dumps(doc))
client = mqtt.Client()
client.check_hostname = False
# set username and password
# Username isn't something you can change, so hardcoded here
client.username_pw_set('bblp', ACCESS_CODE)
# These 2 lines are required to bypass self signed certificate errors, at least on my machine
# these things can be finicky depending on your system setup
client.tls_set(tls_version=ssl.PROTOCOL_TLS, cert_reqs=ssl.CERT_NONE)
client.tls_insecure_set(True)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BAMBU_IP_ADDRESS, 8883, 60)
client.publish(f"device/{SERIAL}/request", '{"pushing": {"command": "start", "sequence_id": 0}}')
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()