This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
MessageInterface.py
201 lines (171 loc) · 9.05 KB
/
MessageInterface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# Interface for Rabbit MQ to intercommunicate with ARGO jobs
import sys,os,ssl
import time
#import logging
from pUtil import tolog
try:
import pika
except:
tolog("pika module does not exist - ARGO interface will fail")
#logger = logging.getLogger(__name__)
#logging.getLogger('pika').setLevel(logging.WARNING)
#logging.getLogger('select_connection').setLevel(logging.DEBUG)
class MessageInterface:
def __init__(self,
username = '',
password = '',
host = '',
port = -1,
virtual_host = '/',
socket_timeout = 120,
exchange_name = '',
exchange_type = 'topic',
exchange_durable = True,
exchange_auto_delete = False,
ssl_cert = '',
ssl_key = '',
ssl_ca_certs = '',
queue_is_durable = True,
queue_is_exclusive = False,
queue_is_auto_delete = True,
):
self.username = username
self.password = password
self.host = host
self.port = port
self.virtual_host = virtual_host
self.socket_timeout = socket_timeout
self.exchange_name = exchange_name
self.exchange_type = exchange_type
self.exchange_durable = exchange_durable
self.exchange_auto_delete = exchange_auto_delete
self.queue_is_durable = queue_is_durable
self.queue_is_exclusive = queue_is_exclusive
self.queue_is_auto_delete = queue_is_auto_delete
self.ssl_cert = ssl_cert
self.ssl_key = ssl_key
self.ssl_ca_certs = ssl_ca_certs
self.credentials = None
self.parameters = None
self.connection = None
self.channel = None
def open_blocking_connection(self):
tolog("MQ ARGO: open blocking connection")
self.create_connection_parameters()
# open the connection and grab the channel
try:
self.connection = pika.BlockingConnection(self.parameters)
except:
tolog('MQ ARGO: Exception received while trying to open blocking connection to message server: ' + str(sys.exc_info()))
raise
try:
self.channel = self.connection.channel()
except:
tolog('MQ ARGO: Exception received while trying to open a channel to the message server: ' + str(sys.exc_info()))
raise
tolog("MQ ARGO: create exchange, name = " + self.exchange_name)
# make sure exchange exists (doesn't do anything if already created)
self.channel.exchange_declare(
exchange = self.exchange_name,
exchange_type = self.exchange_type,
durable = self.exchange_durable,
auto_delete = self.exchange_auto_delete,
)
def open_select_connection(self,
on_open_callback = None,
on_open_error_callback = None,
on_close_callback = None,
stop_ioloop_on_close = True,
):
tolog("MQ ARGO: create select connection")
self.create_connection_parameters()
# open the connection
if on_open_callback is not None:
try:
self.connection = pika.SelectConnection(self.parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
stop_ioloop_on_close,
)
except:
tolog('MQ ARGO: Exception received while trying to open select connection to message server: ' + str(sys.exc_info()))
raise
def create_connection_parameters(self):
tolog("MQ ARGO: create connection parameters, server = " + self.host + " port = " + str(self.port))
# need to set credentials to login to the message server
#self.credentials = pika.PlainCredentials(self.username,self.password)
self.credentials = pika.credentials.ExternalCredentials()
ssl_options_dict = {
"certfile": self.ssl_cert,
"keyfile": self.ssl_key,
"ca_certs": self.ssl_ca_certs,
"cert_reqs": ssl.CERT_REQUIRED,
}
#logger.debug(str(ssl_options_dict))
# setup our connection parameters
self.parameters = pika.ConnectionParameters(
host = self.host,
port = self.port,
virtual_host = self.virtual_host,
credentials = self.credentials,
socket_timeout = self.socket_timeout,
ssl = True,
ssl_options = ssl_options_dict,
)
def create_queue(self,name,routing_key):
# declare a random queue which this job will use to receive messages
# durable = survive reboots of the broker
# exclusive = only current connection can access this queue
# auto_delete = queue will be deleted after connection is closed
self.channel.queue_declare(
queue = str(name),
durable = self.queue_is_durable,
exclusive = self.queue_is_exclusive,
auto_delete = self.queue_is_auto_delete
)
# now bind this queue to the exchange, using a routing key
# any message submitted to the exchange with the
# routing key will appear on this queue
self.channel.queue_bind(exchange=self.exchange_name,
queue=str(name),
routing_key=str(routing_key)
)
def close(self):
self.channel.close()
self.connection.close()
#self.channel = None
#self.connection = None
def send_msg(self,
message_body,
routing_key,
exchange_name = None,
message_headers = {},
priority = 0, # make message persistent
delivery_mode = 2, # default
):
if exchange_name is None:
exchange_name = self.exchange_name
timestamp = time.time()
# create the message properties
properties = pika.BasicProperties(
delivery_mode = delivery_mode,
priority = priority,
timestamp = timestamp,
headers = message_headers,
)
tolog("MQ ARGO: sending message body:\n" + str(message_body))
tolog('MQ ARGO: sending message to exchange: ' + self.exchange_name)
tolog('MQ ARGO: sending message with routing key: ' + routing_key)
self.channel.basic_publish(
exchange = exchange_name,
routing_key = routing_key,
body = message_body,
properties = properties,
)
def receive_msg(self,queue_name, no_ack = False):
# retrieve one message
method, properties, body = self.channel.basic_get(queue=queue_name, no_ack=no_ack)
return method,properties,body
def purge_queue(self,queue_name):
self.channel.queue_purge(queue = queue_name)