-
Notifications
You must be signed in to change notification settings - Fork 674
/
tagui.py
1892 lines (1496 loc) · 75.5 KB
/
tagui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""INTEGRATION ENGINE OF RPA FOR PYTHON PACKAGE ~ TEBEL.ORG"""
# Apache License 2.0, Copyright 2019 Tebel.Automation Private Limited
# https://github.com/tebelorg/RPA-Python/blob/master/LICENSE.txt
__author__ = 'Ken Soh <opensource@tebel.org>'
__version__ = '1.50.0'
import subprocess
import os
import sys
import time
import platform
# required for python 2 usage of io.open
if sys.version_info[0] < 3: import io
# default timeout in seconds for UI element
_tagui_timeout = 10.0
# default delay in seconds in while loops
_tagui_delay = 0.1
# default debug flag to print debug output
_tagui_debug = False
# error flag to raise exception on error
_tagui_error = False
# flag to track if tagui session is started
_tagui_started = False
# flag to track visual automation connected
_tagui_visual = False
# flag to track chrome browser connected
_tagui_chrome = False
# id to track instruction count from rpa python to tagui
_tagui_id = 0
# to track the original directory when init() was called
_tagui_init_directory = ''
# to track file download directory for web browser
_tagui_download_directory = ''
# to track location of TagUI (default user home folder)
if platform.system() == 'Windows':
_tagui_location = os.environ['APPDATA']
else:
_tagui_location = os.path.expanduser('~')
# delete tagui temp output text file to avoid reading old data
if os.path.isfile('rpa_python.txt'): os.remove('rpa_python.txt')
# define local custom javascript functions for use in tagui
_tagui_local_js = \
"""// local custom helper function to check if UI element exists
// keep checking until timeout is reached before return result
// effect is interacting with element as soon as it appears
function exist(element_identifier) {
var exist_timeout = Date.now() + casper.options.waitTimeout;
while (Date.now() < exist_timeout) {
if (present(element_identifier))
return true;
else
sleep(100);
}
return false;
}
// function to replace add_concat() in tagui_header.js
// gain - echoing string with single and double quotes
// loss - no text-like variables usage since Python env
function add_concat(source_string) {
return source_string;
}
"""
def _python2_env():
"""function to check python version for compatibility handling"""
if sys.version_info[0] < 3: return True
else: return False
def _python3_env():
"""function to check python version for compatibility handling"""
return not _python2_env()
def _py23_decode(input_variable = None):
"""function for python 2 and 3 str-byte compatibility handling"""
if input_variable is None: return None
elif _python2_env(): return input_variable
else: return input_variable.decode('utf-8')
def _py23_encode(input_variable = None):
"""function for python 2 and 3 str-byte compatibility handling"""
if input_variable is None: return None
elif _python2_env(): return input_variable
else: return input_variable.encode('utf-8')
def _py23_open(target_filename, target_mode = 'r'):
"""function for python 2 and 3 open utf-8 compatibility handling"""
if _python2_env():
return io.open(target_filename, target_mode, encoding = 'utf-8')
else:
return open(target_filename, target_mode, encoding = 'utf-8')
def _py23_read(input_text = None):
"""function for python 2 and 3 read utf-8 compatibility handling"""
if input_text is None: return None
if _python2_env(): return input_text.encode('utf-8')
else: return input_text
def _py23_write(input_text = None):
"""function for python 2 and 3 write utf-8 compatibility handling"""
if input_text is None: return None
if _python2_env(): return input_text.decode('utf-8')
else: return input_text
def _tagui_read():
"""function to read from tagui process live mode interface"""
# readline instead of read, not expecting user input to tagui
global _process; return _py23_decode(_process.stdout.readline())
def _tagui_write(input_text = ''):
"""function to write to tagui process live mode interface"""
global _process; _process.stdin.write(_py23_encode(input_text))
_process.stdin.flush(); # flush to ensure immediate delivery
def _tagui_output():
"""function to wait for tagui output file to read and delete it"""
global _tagui_delay, _tagui_init_directory
# to handle user changing current directory after init() is called
init_directory_output_file = os.path.join(_tagui_init_directory, 'rpa_python.txt')
# sleep to not splurge cpu cycles in while loop
while not os.path.isfile('rpa_python.txt'):
if os.path.isfile(init_directory_output_file): break
time.sleep(_tagui_delay)
# roundabout implementation to ensure backward compatibility
if os.path.isfile('rpa_python.txt'):
tagui_output_file = _py23_open('rpa_python.txt', 'r')
tagui_output_text = _py23_read(tagui_output_file.read())
tagui_output_file.close()
os.remove('rpa_python.txt')
else:
tagui_output_file = _py23_open(init_directory_output_file, 'r')
tagui_output_text = _py23_read(tagui_output_file.read())
tagui_output_file.close()
os.remove(init_directory_output_file)
return tagui_output_text
def _esq(input_text = ''):
"""function for selective escape of single quote ' for tagui"""
# [BACKSLASH_QUOTE] marker to work together with send()
return input_text.replace("'",'[BACKSLASH_QUOTE]')
def _sdq(input_text = ''):
"""function to escape ' in xpath for tagui live mode"""
# change identifier single quote ' to double quote "
return input_text.replace("'",'"')
def _started():
global _tagui_started; return _tagui_started
def _visual():
global _tagui_visual; return _tagui_visual
def _chrome():
global _tagui_chrome; return _tagui_chrome
def _python_flow():
"""function to create entry tagui flow without visual automation"""
flow_text = '// NORMAL ENTRY FLOW FOR RPA FOR PYTHON ~ TEBEL.ORG\r\n\r\nlive'
flow_file = _py23_open('rpa_python', 'w')
flow_file.write(_py23_write(flow_text))
flow_file.close()
def _visual_flow():
"""function to create entry tagui flow with visual automation"""
flow_text = '// VISUAL ENTRY FLOW FOR RPA FOR PYTHON ~ TEBEL.ORG\r\n' + \
'// mouse_xy() - dummy trigger for SikuliX integration\r\n\r\nlive'
flow_file = _py23_open('rpa_python', 'w')
flow_file.write(_py23_write(flow_text))
flow_file.close()
def _tagui_local():
"""function to create tagui_local.js for custom local functions"""
global _tagui_local_js
javascript_file = _py23_open('tagui_local.js', 'w')
javascript_file.write(_py23_write(_tagui_local_js))
javascript_file.close()
def _tagui_delta(base_directory = None):
"""function to download stable delta files from tagui cutting edge version"""
global __version__
if base_directory is None or base_directory == '': return False
# skip downloading if it is already done before for current release
if os.path.isfile(base_directory + '/' + 'rpa_python_' + __version__): return True
# define list of key tagui files to be downloaded and synced locally
delta_list = ['tagui', 'tagui.cmd', 'end_processes', 'end_processes.cmd',
'tagui_header.js', 'tagui_parse.php', 'tagui.sikuli/tagui.py']
for delta_file in delta_list:
tagui_delta_url = 'https://raw.githubusercontent.com/tebelorg/Tump/master/TagUI-Python/' + delta_file
tagui_delta_file = base_directory + '/' + 'src' + '/' + delta_file
if not download(tagui_delta_url, tagui_delta_file): return False
# make sure execute permission is there for .tagui/src/tagui and end_processes
if platform.system() in ['Linux', 'Darwin']:
os.system('chmod -R 755 "' + base_directory + '/' + 'src' + '/' + 'tagui" > /dev/null 2>&1')
os.system('chmod -R 755 "' + base_directory + '/' + 'src' + '/' + 'end_processes" > /dev/null 2>&1')
# create marker file to skip syncing delta files next time for current release
delta_done_file = _py23_open(base_directory + '/' + 'rpa_python_' + __version__, 'w')
delta_done_file.write(_py23_write('TagUI installation files used by RPA for Python'))
delta_done_file.close()
return True
def _patch_macos_pjs():
"""patch PhantomJS to latest v2.1.1 that plays well with new macOS versions"""
if platform.system() == 'Darwin' and not os.path.isdir(tagui_location() + '/.tagui/src/phantomjs_old'):
original_directory = os.getcwd(); os.chdir(tagui_location() + '/.tagui/src')
print('[RPA][INFO] - downloading latest PhantomJS to fix OpenSSL issue')
download('https://github.com/tebelorg/Tump/releases/download/v1.0.0/phantomjs-2.1.1-macosx.zip', 'phantomjs.zip')
if not os.path.isfile('phantomjs.zip'):
os.chdir(original_directory)
show_error('[RPA][ERROR] - unable to download latest PhantomJS v2.1.1')
return False
unzip('phantomjs.zip'); os.rename('phantomjs', 'phantomjs_old'); os.rename('phantomjs-2.1.1-macosx', 'phantomjs')
if os.path.isfile('phantomjs.zip'): os.remove('phantomjs.zip')
os.system('chmod -R 755 phantomjs > /dev/null 2>&1')
os.chdir(original_directory); return True
else:
return True
def _patch_macos_py3():
"""because newer macOS does not have python command only python3 command"""
if platform.system() == 'Darwin' and not os.path.isfile(tagui_location() + '/.tagui/src/py3_patched'):
if not os.system('python --version > /dev/null 2>&1') == 0:
if os.system('python3 --version > /dev/null 2>&1') == 0:
list_of_patch_files = [tagui_location() + '/.tagui/src/casperjs/bin/casperjs',
tagui_location() + '/.tagui/src/casperjs/tests/clitests/runtests.py',
tagui_location() + '/.tagui/src/slimerjs/slimerjs.py']
for patch_file in list_of_patch_files:
dump(load(patch_file).replace('#!/usr/bin/env python', '#!/usr/bin/env python3'), patch_file)
dump('python updated to python 3', tagui_location() + '/.tagui/src/py3_patched')
return True
def coord(x_coordinate = 0, y_coordinate = 0):
"""function to form a coordinate string from x and y integers"""
return '(' + str(x_coordinate) + ',' + str(y_coordinate) + ')'
def debug(on_off = None):
"""function to set debug mode, eg print debug info"""
global _tagui_debug
if on_off is not None:
if isinstance(on_off, int):
_tagui_debug = on_off
else:
send('// ' + on_off)
return _tagui_debug
def error(on_off = None):
"""function to set mode to raise exception on error"""
global _tagui_error
if on_off is not None: _tagui_error = on_off
return _tagui_error
def show_error(error_message = None):
"""function to raise exception with given message"""
if error_message is None:
error_message = '[RPA][ERROR] - unknown error encountered'
if not error():
print(error_message)
else:
raise Exception(error_message)
return False
def tagui_location(location = None):
"""function to set location of TagUI installation"""
global _tagui_location
if location is not None: _tagui_location = location
return _tagui_location
def unzip(file_to_unzip = None, unzip_location = None):
"""function to unzip zip file to specified location"""
import zipfile
if file_to_unzip is None or file_to_unzip == '':
show_error('[RPA][ERROR] - filename missing for unzip()')
return False
elif not os.path.isfile(file_to_unzip):
show_error('[RPA][ERROR] - file specified missing for unzip()')
return False
zip_file = zipfile.ZipFile(file_to_unzip, 'r')
if unzip_location is None or unzip_location == '':
zip_file.extractall()
else:
zip_file.extractall(unzip_location)
zip_file.close()
return True
def setup():
"""function to setup TagUI to user home folder on Linux / macOS / Windows"""
# get user home folder location to setup tagui
home_directory = tagui_location()
print('[RPA][INFO] - setting up TagUI for use in your Python environment')
# special check for macOS - download() will fail due to no SSL certs for Python 3
if platform.system() == 'Darwin' and _python3_env():
if os.system('/Applications/Python\ 3.9/Install\ Certificates.command > /dev/null 2>&1') != 0:
if os.system('/Applications/Python\ 3.8/Install\ Certificates.command > /dev/null 2>&1') != 0:
if os.system('/Applications/Python\ 3.7/Install\ Certificates.command > /dev/null 2>&1') != 0:
os.system('/Applications/Python\ 3.6/Install\ Certificates.command > /dev/null 2>&1')
# set tagui zip filename for respective operating systems
if platform.system() == 'Linux': tagui_zip_file = 'TagUI_Linux.zip'
elif platform.system() == 'Darwin': tagui_zip_file = 'TagUI_macOS.zip'
elif platform.system() == 'Windows': tagui_zip_file = 'TagUI_Windows.zip'
else:
show_error('[RPA][ERROR] - unknown ' + platform.system() + ' operating system to setup TagUI')
return False
if not os.path.isfile('rpa_python.zip'):
# primary installation pathway by downloading from internet, requiring internet access
print('[RPA][INFO] - downloading TagUI (~200MB) and unzipping to below folder...')
print('[RPA][INFO] - ' + home_directory)
# set tagui zip download url and download zip for respective operating systems
tagui_zip_url = 'https://github.com/tebelorg/Tump/releases/download/v1.0.0/' + tagui_zip_file
if not download(tagui_zip_url, home_directory + '/' + tagui_zip_file):
# error message is shown by download(), no need for message here
return False
# unzip downloaded zip file to user home folder
unzip(home_directory + '/' + tagui_zip_file, home_directory)
if not os.path.isfile(home_directory + '/' + 'tagui' + '/' + 'src' + '/' + 'tagui'):
show_error('[RPA][ERROR] - unable to unzip TagUI to ' + home_directory)
return False
else:
# secondary installation pathway by using the rpa_python.zip generated from pack()
print('[RPA][INFO] - unzipping TagUI (~200MB) from rpa_python.zip to below folder...')
print('[RPA][INFO] - ' + home_directory)
import shutil
shutil.move('rpa_python.zip', home_directory + '/' + tagui_zip_file)
if not os.path.isdir(home_directory + '/tagui'): os.mkdir(home_directory + '/tagui')
unzip(home_directory + '/' + tagui_zip_file, home_directory + '/tagui')
if not os.path.isfile(home_directory + '/' + 'tagui' + '/' + 'src' + '/' + 'tagui'):
show_error('[RPA][ERROR] - unable to unzip TagUI to ' + home_directory)
return False
# set correct tagui folder for different operating systems
if platform.system() == 'Windows':
tagui_directory = home_directory + '/' + 'tagui'
else:
tagui_directory = home_directory + '/' + '.tagui'
# overwrite tagui to .tagui folder for Linux / macOS
# first rename existing .tagui folder to .tagui_previous
if os.path.isdir(tagui_directory):
os.rename(tagui_directory, tagui_directory + '_previous')
# next rename extracted tagui folder (verified earlier) to .tagui
os.rename(home_directory + '/' + 'tagui', tagui_directory)
# finally remove .tagui_previous folder if it exists
if os.path.isdir(tagui_directory + '_previous'):
import shutil
shutil.rmtree(tagui_directory + '_previous')
# after unzip, remove downloaded zip file to save disk space
if os.path.isfile(home_directory + '/' + tagui_zip_file):
os.remove(home_directory + '/' + tagui_zip_file)
# download stable delta files from tagui cutting edge version
print('[RPA][INFO] - done. syncing TagUI with stable cutting edge version')
if not _tagui_delta(tagui_directory): return False
# perform Linux specific setup actions
if platform.system() == 'Linux':
# zipfile extractall does not preserve execute permissions
# invoking chmod to set all files with execute permissions
# and update delta tagui/src/tagui with execute permission
if os.system('chmod -R 755 "' + tagui_directory + '" > /dev/null 2>&1') != 0:
show_error('[RPA][ERROR] - unable to set permissions for .tagui folder')
return False
# check that php, a dependency for tagui, is installed and working
if os.system('php --version > /dev/null 2>&1') != 0:
print('[RPA][INFO] - PHP is not installed by default on your Linux distribution')
print('[RPA][INFO] - google how to install PHP (eg for Ubuntu, apt-get install php)')
print('[RPA][INFO] - after that, TagUI ready for use in your Python environment')
print('[RPA][INFO] - visual automation (optional) requires special setup on Linux,')
print('[RPA][INFO] - see the link below to install OpenCV and Tesseract libraries')
print('[RPA][INFO] - https://sikulix-2014.readthedocs.io/en/latest/newslinux.html')
return False
else:
print('[RPA][INFO] - TagUI now ready for use in your Python environment')
print('[RPA][INFO] - visual automation (optional) requires special setup on Linux,')
print('[RPA][INFO] - see the link below to install OpenCV and Tesseract libraries')
print('[RPA][INFO] - https://sikulix-2014.readthedocs.io/en/latest/newslinux.html')
# perform macOS specific setup actions
if platform.system() == 'Darwin':
# zipfile extractall does not preserve execute permissions
# invoking chmod to set all files with execute permissions
# and update delta tagui/src/tagui with execute permission
if os.system('chmod -R 755 "' + tagui_directory + '" > /dev/null 2>&1') != 0:
show_error('[RPA][ERROR] - unable to set permissions for .tagui folder')
return False
# patch PhantomJS to solve OpenSSL issue
if not _patch_macos_pjs(): return False
# patch files to solve no python issue
if not _patch_macos_py3(): return False
print('[RPA][INFO] - TagUI now ready for use in your Python environment')
# perform Windows specific setup actions
if platform.system() == 'Windows':
# check that tagui packaged php is working, it has dependency on MSVCR110.dll
if os.system('"' + tagui_directory + '/' + 'src' + '/' + 'php/php.exe" -v > nul 2>&1') != 0:
print('[RPA][INFO] - now installing missing Visual C++ Redistributable dependency')
# download from hosted setup file, if not already present when deployed using pack()
if not os.path.isfile(tagui_directory + '/vcredist_x86.exe'):
vcredist_x86_url = 'https://raw.githubusercontent.com/tebelorg/Tump/master/vcredist_x86.exe'
if not download(vcredist_x86_url, tagui_directory + '/vcredist_x86.exe'):
return False
# run setup to install the MSVCR110.dll dependency (user action required)
os.system('"' + tagui_directory + '/vcredist_x86.exe"')
# check again if tagui packaged php is working, after installing vcredist_x86.exe
if os.system('"' + tagui_directory + '/' + 'src' + '/' + 'php/php.exe" -v > nul 2>&1') != 0:
print('[RPA][INFO] - MSVCR110.dll is still missing, install vcredist_x86.exe from')
print('[RPA][INFO] - the vcredist_x86.exe file in ' + home_directory + '\\tagui or from')
print('[RPA][INFO] - https://www.microsoft.com/en-us/download/details.aspx?id=30679')
print('[RPA][INFO] - after that, TagUI ready for use in your Python environment')
return False
else:
print('[RPA][INFO] - TagUI now ready for use in your Python environment')
else:
print('[RPA][INFO] - TagUI now ready for use in your Python environment')
return True
def init(visual_automation = False, chrome_browser = True, headless_mode = False, turbo_mode = False):
"""start and connect to tagui process by checking tagui live mode readiness"""
global _process, _tagui_started, _tagui_id, _tagui_visual, _tagui_chrome, _tagui_init_directory, _tagui_download_directory
if _tagui_started:
show_error('[RPA][ERROR] - use close() before using init() again')
return False
# reset id to track instruction count from rpa python to tagui
_tagui_id = 0
# reset variable to track original directory when init() was called
_tagui_init_directory = ''
# get user home folder location to locate tagui executable
if platform.system() == 'Windows':
tagui_directory = tagui_location() + '/' + 'tagui'
else:
tagui_directory = tagui_location() + '/' + '.tagui'
tagui_executable = tagui_directory + '/' + 'src' + '/' + 'tagui'
end_processes_executable = tagui_directory + '/' + 'src' + '/' + 'end_processes'
# if tagui executable is not found, initiate setup() to install tagui
if not os.path.isfile(tagui_executable):
if not setup():
# error message is shown by setup(), no need for message here
return False
# sync tagui delta files for current release if needed
if not _tagui_delta(tagui_directory): return False
# on macOS, patch PhantomJS to latest v2.1.1 to solve OpenSSL issue
if platform.system() == 'Darwin' and not _patch_macos_pjs(): return False
# newer macOS has no python command, patch some files header to python3
if platform.system() == 'Darwin' and not _patch_macos_py3(): return False
# create entry flow to launch SikuliX accordingly
if visual_automation:
# check for working java jdk for visual automation mode
if platform.system() == 'Windows':
shell_silencer = '> nul 2>&1'
else:
shell_silencer = '> /dev/null 2>&1'
# check whether java is installed on the computer
if os.system('java -version ' + shell_silencer) != 0:
print('[RPA][INFO] - to use visual automation mode, OpenJDK v8 (64-bit) or later is required')
print('[RPA][INFO] - download from Amazon Corretto\'s website - https://aws.amazon.com/corretto')
print('[RPA][INFO] - OpenJDK is preferred over Java JDK which is free for non-commercial use only')
return False
else:
# then check whether it is 64-bit required by sikulix
os.system('java -version > java_version.txt 2>&1')
java_version_info = load('java_version.txt').lower()
os.remove('java_version.txt')
if '64 bit' not in java_version_info and '64-bit' not in java_version_info:
print('[RPA][INFO] - to use visual automation mode, OpenJDK v8 (64-bit) or later is required')
print('[RPA][INFO] - download from Amazon Corretto\'s website - https://aws.amazon.com/corretto')
print('[RPA][INFO] - OpenJDK is preferred over Java JDK which is free for non-commercial use only')
return False
else:
# start a dummy first run if never run before, to let sikulix integrate jython
sikulix_folder = tagui_directory + '/' + 'src' + '/' + 'sikulix'
if os.path.isfile(sikulix_folder + '/' + 'jython-standalone-2.7.1.jar'):
os.system('java -jar "' + sikulix_folder + '/' + 'sikulix.jar" -h ' + shell_silencer)
_visual_flow()
else:
_python_flow()
# create tagui_local.js for custom functions
_tagui_local()
# invoke web browser accordingly with tagui option
browser_option = ''
if chrome_browser:
browser_option = 'chrome'
if headless_mode:
browser_option = 'headless'
# special handling for turbo mode to run 10X faster
tagui_chrome_php = tagui_directory + '/' + 'src' + '/' + 'tagui_chrome.php'
tagui_header_js = tagui_directory + '/' + 'src' + '/' + 'tagui_header.js'
tagui_sikuli_py = tagui_directory + '/' + 'src' + '/' + 'tagui.sikuli/tagui.py'
if not turbo_mode:
dump(load(tagui_chrome_php).replace('$scan_period = 10000;', '$scan_period = 100000;'), tagui_chrome_php)
dump(load(tagui_header_js).replace('function sleep(ms) {ms *= 0.1; //', 'function sleep(ms) { //').replace("chrome_step('Input.insertText',{text: value});};", "for (var character = 0, length = value.length; character < length; character++) {\nchrome_step('Input.dispatchKeyEvent',{type: 'char', text: value[character]});}};"), tagui_header_js)
dump(load(tagui_sikuli_py).replace('scan_period = 0.05\n\n# teleport mouse instead of moving to target\nSettings.MoveMouseDelay = 0', 'scan_period = 0.5'), tagui_sikuli_py)
else:
dump(load(tagui_chrome_php).replace('$scan_period = 100000;', '$scan_period = 10000;'), tagui_chrome_php)
dump(load(tagui_header_js).replace('function sleep(ms) { //', 'function sleep(ms) {ms *= 0.1; //').replace("for (var character = 0, length = value.length; character < length; character++) {\nchrome_step('Input.dispatchKeyEvent',{type: 'char', text: value[character]});}};", "chrome_step('Input.insertText',{text: value});};"), tagui_header_js)
dump(load(tagui_sikuli_py).replace('scan_period = 0.5', 'scan_period = 0.05\n\n# teleport mouse instead of moving to target\nSettings.MoveMouseDelay = 0'), tagui_sikuli_py)
# entry shell command to invoke tagui process
tagui_cmd = '"' + tagui_executable + '"' + ' rpa_python ' + browser_option
# run tagui end processes script to flush dead processes
# for eg execution ended with ctrl+c or forget to close()
os.system('"' + end_processes_executable + '"')
try:
# launch tagui using subprocess
_process = subprocess.Popen(
tagui_cmd, shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# loop until tagui live mode is ready or tagui process has ended
while True:
# failsafe exit if tagui process gets killed for whatever reason
if _process.poll() is not None:
print('[RPA][ERROR] - following happens when starting TagUI...')
print('')
print('The following command is executed to start TagUI -')
print(tagui_cmd)
print('')
print('It leads to following output when starting TagUI -')
os.system(tagui_cmd)
print('')
_tagui_visual = False
_tagui_chrome = False
_tagui_started = False
show_error()
return False
# read next line of output from tagui process live mode interface
tagui_out = _tagui_read()
# check that tagui live mode is ready then start listening for inputs
if 'LIVE MODE - type done to quit' in tagui_out:
# dummy + start line to clear live mode backspace char before listening
_tagui_write('echo "[RPA][STARTED]"\n')
_tagui_write('echo "[RPA][' + str(_tagui_id) + '] - listening for inputs"\n')
_tagui_visual = visual_automation
_tagui_chrome = chrome_browser
_tagui_started = True
# loop until tagui live mode is ready and listening for inputs
# also check _tagui_started to handle unexpected termination
while _tagui_started and not _ready(): pass
if not _tagui_started:
show_error('[RPA][ERROR] - TagUI process ended unexpectedly')
return False
# remove generated tagui flow, js code and custom functions files
if os.path.isfile('rpa_python'): os.remove('rpa_python')
if os.path.isfile('rpa_python.js'): os.remove('rpa_python.js')
if os.path.isfile('rpa_python.raw'): os.remove('rpa_python.raw')
if os.path.isfile('tagui_local.js'): os.remove('tagui_local.js')
# increment id and prepare for next instruction
_tagui_id = _tagui_id + 1
# set variable to track original directory when init() was called
_tagui_init_directory = os.getcwd()
# set variable to track file download directory for web browser
_tagui_download_directory = os.getcwd()
return True
except Exception as e:
_tagui_visual = False
_tagui_chrome = False
_tagui_started = False
show_error('[RPA][ERROR] - ' + str(e))
return False
def pack():
"""function to pack TagUI files for installation on an air-gapped computer without internet"""
print('[RPA][INFO] - pack() is to deploy RPA for Python to a computer without internet')
print('[RPA][INFO] - update() is to update an existing installation deployed from pack()')
print('[RPA][INFO] - detecting and zipping your TagUI installation to rpa_python.zip ...')
# first make sure TagUI files have been downloaded and synced to latest stable delta files
global _tagui_started
if _tagui_started:
if not close():
return False
if not init(False, False):
return False
if not close():
return False
# next download jython to tagui/src/sikulix folder (after init() it can be moved away)
if platform.system() == 'Windows':
tagui_directory = tagui_location() + '/' + 'tagui'
# pack in Visual C++ MSVCR110.dll dependency from PHP for offline installation
vcredist_x86_url = 'https://raw.githubusercontent.com/tebelorg/Tump/master/vcredist_x86.exe'
if not download(vcredist_x86_url, tagui_directory + '/vcredist_x86.exe'):
return False
else:
tagui_directory = tagui_location() + '/' + '.tagui'
sikulix_directory = tagui_directory + '/' + 'src' + '/' + 'sikulix'
sikulix_jython_url = 'https://github.com/tebelorg/Tump/releases/download/v1.0.0/jython-standalone-2.7.1.jar'
if not download(sikulix_jython_url, sikulix_directory + '/' + 'jython-standalone-2.7.1.jar'):
return False
# finally zip entire TagUI installation and save a copy of tagui.py to current folder
import shutil
shutil.make_archive('rpa_python', 'zip', tagui_directory)
shutil.copyfile(os.path.dirname(__file__) + '/tagui.py', 'rpa.py')
print('[RPA][INFO] - done. copy rpa_python.zip and rpa.py to your target computer.')
print('[RPA][INFO] - then install and use with import rpa as r followed by r.init()')
return True
def update():
"""function to update package and TagUI files on an air-gapped computer without internet"""
print('[RPA][INFO] - pack() is to deploy RPA for Python to a computer without internet')
print('[RPA][INFO] - update() is to update an existing installation deployed from pack()')
print('[RPA][INFO] - downloading latest RPA for Python and TagUI files...')
# first download updated files to rpa_update folder and zip them to rpa_update.zip
if not os.path.isdir('rpa_update'): os.mkdir('rpa_update')
if not os.path.isdir('rpa_update/tagui.sikuli'): os.mkdir('rpa_update/tagui.sikuli')
rpa_python_url = 'https://raw.githubusercontent.com/tebelorg/RPA-Python/master/tagui.py'
if not download(rpa_python_url, 'rpa_update' + '/' + 'rpa.py'): return False
# get version number of latest release for the package to use in generated update.py
rpa_python_py = load('rpa_update' + '/' + 'rpa.py')
v_front_marker = "__version__ = '"; v_back_marker = "'"
rpa_python_py = rpa_python_py[rpa_python_py.find(v_front_marker) + len(v_front_marker):]
rpa_python_py = rpa_python_py[:rpa_python_py.find(v_back_marker)]
delta_list = ['tagui', 'tagui.cmd', 'end_processes', 'end_processes.cmd',
'tagui_header.js', 'tagui_parse.php', 'tagui.sikuli/tagui.py']
for delta_file in delta_list:
tagui_delta_url = 'https://raw.githubusercontent.com/tebelorg/Tump/master/TagUI-Python/' + delta_file
tagui_delta_file = 'rpa_update' + '/' + delta_file
if not download(tagui_delta_url, tagui_delta_file): return False
import shutil
shutil.make_archive('rpa_update', 'zip', 'rpa_update')
# next define string variables for update.py header and footer to be used in next section
# indentation formatting has to be removed below, else unwanted indentation added to file
update_py_header = \
"""import rpa as r
import platform
import base64
import shutil
import os
rpa_update_zip = \\
"""
update_py_footer = \
"""
# create update.zip from base64 data embedded in update.py
update_zip_file = open('update.zip','wb')
update_zip_file.write(base64.b64decode(rpa_update_zip))
update_zip_file.close()
# unzip update.zip to tagui folder in user home directory
if platform.system() == 'Windows':
base_directory = os.environ['APPDATA'] + '/tagui'
else:
base_directory = os.path.expanduser('~') + '/.tagui'
# uncomment below to define and use custom TagUI folder
#base_directory = 'your_full_path'
r.unzip('update.zip', base_directory + '/src')
if os.path.isfile('update.zip'): os.remove('update.zip')
# make sure execute permission is there for Linux / macOS
if platform.system() in ['Linux', 'Darwin']:
os.system('chmod -R 755 "' + base_directory + '/src/tagui" > /dev/null 2>&1')
os.system('chmod -R 755 "' + base_directory + '/src/end_processes" > /dev/null 2>&1')
# create marker file to skip syncing for current release
delta_done_file = r._py23_open(base_directory + '/' + 'rpa_python_' + __version__, 'w')
delta_done_file.write(r._py23_write('TagUI installation files used by RPA for Python'))
delta_done_file.close()
# move updated package file rpa.py to package folder
shutil.move(base_directory + '/src/rpa.py', os.path.dirname(r.__file__) + '/rpa.py')
print('[RPA][INFO] - done. RPA for Python updated to version ' + __version__)
"""
# finally create update.py containing python code and zipped data of update in base64
try:
import base64
dump("__version__ = '" + rpa_python_py + "'\n\n", 'update.py')
write(update_py_header, 'update.py')
update_zip_file = open('rpa_update.zip','rb')
zip_base64_data = (base64.b64encode(update_zip_file.read())).decode('utf-8')
update_zip_file.close()
write('"""' + zip_base64_data + '"""', 'update.py')
write(update_py_footer, 'update.py')
# remove temporary folder and downloaded files, show result and usage message
if os.path.isdir('rpa_update'): shutil.rmtree('rpa_update')
if os.path.isfile('rpa_update.zip'): os.remove('rpa_update.zip')
print('[RPA][INFO] - done. copy or email update.py to your target computer and run')
print('[RPA][INFO] - python update.py to update RPA for Python to version ' + rpa_python_py)
print('[RPA][INFO] - to use custom TagUI folder, set base_directory in update.py')
return True
except Exception as e:
show_error('[RPA][ERROR] - ' + str(e))
return False
def _ready():
"""internal function to check if tagui is ready to receive instructions after init() is called"""
global _process, _tagui_started, _tagui_id, _tagui_visual, _tagui_chrome
if not _tagui_started:
# print output error in calling parent function instead
return False
try:
# failsafe exit if tagui process gets killed for whatever reason
if _process.poll() is not None:
# print output error in calling parent function instead
_tagui_visual = False
_tagui_chrome = False
_tagui_started = False
return False
# read next line of output from tagui process live mode interface
tagui_out = _tagui_read()
# print to screen debug output that is saved to rpa_python.log
if debug():
sys.stdout.write(tagui_out); sys.stdout.flush()
# check if tagui live mode is listening for inputs and return result
if tagui_out.strip().startswith('[RPA][') and tagui_out.strip().endswith('] - listening for inputs'):
return True
else:
return False
except Exception as e:
show_error('[RPA][ERROR] - ' + str(e))
return False
def send(tagui_instruction = None):
"""send next live mode instruction to tagui for processing if tagui is ready"""
global _process, _tagui_started, _tagui_id, _tagui_visual, _tagui_chrome
if not _tagui_started:
show_error('[RPA][ERROR] - use init() before using send()')
return False
if tagui_instruction is None or tagui_instruction == '': return True
try:
# failsafe exit if tagui process gets killed for whatever reason
if _process.poll() is not None:
_tagui_visual = False
_tagui_chrome = False
_tagui_started = False
show_error('[RPA][ERROR] - no active TagUI process to send()')
return False
# escape special characters for them to reach tagui correctly
tagui_instruction = tagui_instruction.replace('\\','\\\\')
tagui_instruction = tagui_instruction.replace('\n','\\n')
tagui_instruction = tagui_instruction.replace('\r','\\r')
tagui_instruction = tagui_instruction.replace('\t','\\t')
tagui_instruction = tagui_instruction.replace('\a','\\a')
tagui_instruction = tagui_instruction.replace('\b','\\b')
tagui_instruction = tagui_instruction.replace('\f','\\f')
# special handling for single quote to work with _esq() for tagui
tagui_instruction = tagui_instruction.replace('[BACKSLASH_QUOTE]','\\\'')
# escape backslash to display source string correctly after echoing
echo_safe_instruction = tagui_instruction.replace('\\','\\\\')
# escape double quote because echo step below uses double quotes
echo_safe_instruction = echo_safe_instruction.replace('"','\\"')
# echo live mode instruction, after preparing string to be echo-safe
_tagui_write('echo "[RPA][' + str(_tagui_id) + '] - ' + echo_safe_instruction + '"\n')
# send live mode instruction to be executed
_tagui_write(tagui_instruction + '\n')
# echo marker text to prepare for next instruction
_tagui_write('echo "[RPA][' + str(_tagui_id) + '] - listening for inputs"\n')
# loop until tagui live mode is ready and listening for inputs
# also check _tagui_started to handle unexpected termination
while _tagui_started and not _ready(): pass
if not _tagui_started:
show_error('[RPA][ERROR] - TagUI process ended unexpectedly')
return False
# increment id and prepare for next instruction
_tagui_id = _tagui_id + 1
return True
except Exception as e:
show_error('[RPA][ERROR] - ' + str(e))
return False
def close():
"""disconnect from tagui process by sending 'done' trigger instruction"""
global _process, _tagui_started, _tagui_id, _tagui_visual, _tagui_chrome, _tagui_init_directory
if not _tagui_started:
show_error('[RPA][ERROR] - use init() before using close()')
return False
try:
# failsafe exit if tagui process gets killed for whatever reason
if _process.poll() is not None:
_tagui_visual = False
_tagui_chrome = False
_tagui_started = False
show_error('[RPA][ERROR] - no active TagUI process to close()')
return False
# send 'done' instruction to terminate live mode and exit tagui
_tagui_write('echo "[RPA][FINISHED]"\n')
_tagui_write('done\n')
# loop until tagui process has closed before returning control
while _process.poll() is None: pass
# remove again generated tagui flow, js code and custom functions files
if os.path.isfile('rpa_python'): os.remove('rpa_python')
if os.path.isfile('rpa_python.js'): os.remove('rpa_python.js')
if os.path.isfile('rpa_python.raw'): os.remove('rpa_python.raw')
if os.path.isfile('tagui_local.js'): os.remove('tagui_local.js')
# to handle user changing current directory after init() is called
if os.path.isfile(os.path.join(_tagui_init_directory, 'rpa_python')):
os.remove(os.path.join(_tagui_init_directory, 'rpa_python'))
if os.path.isfile(os.path.join(_tagui_init_directory, 'rpa_python.js')):
os.remove(os.path.join(_tagui_init_directory, 'rpa_python.js'))
if os.path.isfile(os.path.join(_tagui_init_directory, 'rpa_python.raw')):
os.remove(os.path.join(_tagui_init_directory, 'rpa_python.raw'))
if os.path.isfile(os.path.join(_tagui_init_directory, 'tagui_local.js')):
os.remove(os.path.join(_tagui_init_directory, 'tagui_local.js'))
# remove generated tagui log and data files if not in debug mode
if not debug():
if os.path.isfile('rpa_python.log'): os.remove('rpa_python.log')
if os.path.isfile('rpa_python.txt'): os.remove('rpa_python.txt')
# to handle user changing current directory after init() is called
if os.path.isfile(os.path.join(_tagui_init_directory, 'rpa_python.log')):
os.remove(os.path.join(_tagui_init_directory, 'rpa_python.log'))
if os.path.isfile(os.path.join(_tagui_init_directory, 'rpa_python.txt')):
os.remove(os.path.join(_tagui_init_directory, 'rpa_python.txt'))
_tagui_visual = False
_tagui_chrome = False
_tagui_started = False
return True
except Exception as e:
_tagui_visual = False
_tagui_chrome = False
_tagui_started = False
show_error('[RPA][ERROR] - ' + str(e))
return False
def exist(element_identifier = None):
if not _started():
show_error('[RPA][ERROR] - use init() before using exist()')
return False
if element_identifier is None or element_identifier == '':
return False
# return True for keywords as the computer screen always exists
if element_identifier.lower() in ['page.png', 'page.bmp']:
if _visual():
return True
else:
show_error('[RPA][ERROR] - page.png / page.bmp requires init(visual_automation = True)')
return False
# pre-emptive checks if image files are specified for visual automation
if element_identifier.lower().endswith('.png') or element_identifier.lower().endswith('.bmp'):
if not _visual():
show_error('[RPA][ERROR] - ' + element_identifier + ' identifier requires init(visual_automation = True)')
return False
# assume that (x,y) coordinates for visual automation always exist
if element_identifier.startswith('(') and element_identifier.endswith(')'):
if len(element_identifier.split(',')) in [2, 3]:
if not any(c.isalpha() for c in element_identifier):
if _visual():
return True
else:
show_error('[RPA][ERROR] - x, y coordinates require init(visual_automation = True)')
return False
send('exist_result = exist(\'' + _sdq(element_identifier) + '\').toString()')
send('dump exist_result to rpa_python.txt')
if _tagui_output() == 'true':
return True
else:
return False
def url(webpage_url = None):
if not _started():
show_error('[RPA][ERROR] - use init() before using url()')
return False
if not _chrome():
show_error('[RPA][ERROR] - url() requires init(chrome_browser = True)')
return False
if webpage_url is not None and webpage_url != '':
if webpage_url.lower().startswith('www.'): webpage_url = 'https://' + webpage_url
if webpage_url.startswith('http://') or webpage_url.startswith('https://'):
if not send(_esq(webpage_url)):
return False
else:
return True