forked from nikhilkumarrathi/cryptotrader-telegram
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.py
67 lines (50 loc) · 2.08 KB
/
processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import schedule
from dotmap import DotMap
import commands as cm
from access import accessControl
import log
# Remember to only use single threaded, as we are using global variable for telegram chat_id
executor = ThreadPoolExecutor(max_workers=3,thread_name_prefix="task")
def run_scheduler():
threading.current_thread().setName("Scheduler")
while True:
schedule.run_pending()
time.sleep(1)
schedulerExecutor = ThreadPoolExecutor(max_workers=1,thread_name_prefix="schedule")
schedulerExecutor.submit(run_scheduler)
def done(fn):
if fn.cancelled():
log.warn('{}: cancelled'.format(fn.arg))
elif fn.done():
error = fn.exception()
if error:
log.error('error returned: {}'.format(error))
def process_message(message: DotMap, use_executor=True):
text, author = message.text, message.chat.username
log.debug("To Processor: message: %s ", message.toDict())
text = re.sub('\s+', ' ', text).strip()
# command: /<command> [params...]
command_n_args = text[1:].split(" ")
curr_command = command_n_args[0].lower()
params = list(map(lambda x: x.strip(), command_n_args[1:]))
task = DotMap({'message': message, 'params': params, 'command': curr_command})
if curr_command in cm.commands:
access_list = cm.accessManagement[curr_command] if curr_command in cm.accessManagement else []
access_granted = (author.lower() in access_list or author.lower() == accessControl.adminUserId)
if not access_granted:
message.text = '/' + cm.accessdenied + ' ' + message.text
task.params = [curr_command] + params
curr_command = cm.accessdenied
command_fn = cm.commands[curr_command]
log.info('submitting task: %s %s', task.command, task.params)
if use_executor:
future = executor.submit(command_fn, task)
future.add_done_callback(done)
else:
command_fn(task)
else:
log.error(f'command "{curr_command}" not found"')