Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
EOS-22209: Message Bus code (#542)
Browse files Browse the repository at this point in the history
* Message Bus code

Signed-off-by: Ajay Paratmandali <ajay.paratmandali@seagate.com>

* Add force test

Signed-off-by: Ajay Paratmandali <ajay.paratmandali@seagate.com>

* Implement message bus

Signed-off-by: Ajay Paratmandali <ajay.paratmandali@seagate.com>

* Address all comment

Signed-off-by: Ajay Paratmandali <ajay.paratmandali@seagate.com>

* Fix name

Signed-off-by: Ajay Paratmandali <ajay.paratmandali@seagate.com>
  • Loading branch information
ajay-paratmandali authored Aug 13, 2021
1 parent 0404aca commit 3d35726
Show file tree
Hide file tree
Showing 10 changed files with 620 additions and 247 deletions.
107 changes: 42 additions & 65 deletions ha/core/event_analyzer/watcher/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

import json
import traceback
from threading import Thread

from cortx.utils.message_bus import MessageConsumer
from cortx.utils.log import Log

from ha.util.message_bus import MessageBus
from ha.util.message_bus import CONSUMER_STATUS
from ha.core.event_analyzer.filter.filter import Filter
from ha.core.event_analyzer.parser.parser import Parser
from ha.core.event_analyzer.subscriber import Subscriber
Expand All @@ -30,23 +28,21 @@
from ha.core.event_analyzer.event_analyzer_exceptions import EventParserException
from ha.core.event_analyzer.event_analyzer_exceptions import SubscriberException

class Watcher(Thread):
class Watcher:
""" Watch message bus to check in coming event. """

def __init__(self, consumer_id: int, message_type: str, consumer_group: str,
event_filter: Filter, event_parser: Parser, subscriber: Subscriber):
"""
Initalize Watcher class to monitor message bus event.
Args:
id (int): Consumer ID for message bus.
consumer_id (int): Consumer ID for message bus.
message_type (str): Message type for getting event.
group (str): Consumer Group of message bus.
consumer_group (str): Consumer Group of message bus.
event_filter (Filter): Filter unused event.
parser (Parser): Parse event to HealthEvent
event_parser (Parser): Parse event to HealthEvent
subscriber (Subscriber): Pass event to Subscriber.
"""
super(Watcher, self).__init__(name=f"{message_type}-{str(consumer_id)}", daemon=True)
Log.info(f"Initalizing watcher {message_type}-{str(consumer_id)}")
self.consumer_id = consumer_id
self.message_type = message_type
Expand All @@ -55,77 +51,58 @@ def __init__(self, consumer_id: int, message_type: str, consumer_group: str,
self.parser = event_parser
self.subscriber = subscriber
self._validate()
self.consumer = self._get_connection()
self.consumer = MessageBus.get_consumer(consumer_id=str(self.consumer_id),
consumer_group=self.consumer_group,
message_type=self.message_type,
callback=self.process_message)

def _validate(self) -> None:
"""
Validate watcher and raise exception.
Raises:
EventAnalyzer: event analyzer exception.
"""
if not isinstance(self.subscriber, Subscriber):
raise InvalidSubscriber(f"Invalid subscriber {self.subscriber}")

def _get_connection(self) -> MessageConsumer:
"""
Get message consumer connection.
Returns:
MessageConsumer: Return instance of MessageConsumer.
"""
return MessageConsumer(consumer_id=str(self.consumer_id),
consumer_group=self.consumer_group,
message_types=[self.message_type],
auto_ack=False, offset='earliest')

def _get_message(self):
def process_message(self, message: str) -> None:
"""
Receive message from message bus.
Returns:
str: JSON object of message
Callback function to get message.
Args:
message (str): Message received from message bus.
"""
try:
message = self.consumer.receive(timeout=0)
message = json.loads(message.decode('utf-8'))
except Exception as e:
Log.error(f"Failed to receive message, error: {e}. Retrying to receive.")
return None
Log.error(f"Invalid message {message}, sollow exception and ack it. Error: {e}")
return CONSUMER_STATUS.SUCCESS
try:
return json.loads(message.decode('utf-8'))
Log.debug(f"Captured message: {message}")
if self.filter.filter_event(json.dumps(message)):
Log.info(f"Filtered Event detected: {message}")
event = self.parser.parse_event(json.dumps(message))
try:
Log.info(f"Processing event {event} to subscriber...")
self.subscriber.process_event(event)
except Exception as e:
raise SubscriberException(f"Failed to process event {message}. Error: {e}")
return CONSUMER_STATUS.SUCCESS
except EventFilterException as e:
Log.error(f"Filter exception {e} {traceback.format_exc()} for {message}. Ack Message.")
return CONSUMER_STATUS.SUCCESS
except EventParserException as e:
Log.error(f"Parser exception {e} {traceback.format_exc()} for {message}. Ack Message.")
return CONSUMER_STATUS.SUCCESS
except SubscriberException as e:
Log.error(f"Subscriber exception {e} {traceback.format_exc()} for {message}, retry without ack.")
return CONSUMER_STATUS.FAILED
except Exception as e:
Log.error(f"Invalid format of message failed due to {e}. Message : {str(message)}")
self.consumer.ack()
return None
Log.error(f"Unknown Exception caught {e} {traceback.format_exc()}")
Log.error(f"Forcefully ack failed msg: {message}")
return CONSUMER_STATUS.FAILED

def run(self):
def start(self):
"""
Overloaded of Thread.
Start watcher to listen messages.
"""
while True:
message = self._get_message()
if message is None:
continue
try:
Log.debug(f"Captured message: {message}")
if self.filter.filter_event(json.dumps(message)):
Log.info(f"Filtered Event detected: {message}")
event = self.parser.parse_event(json.dumps(message))
try:
Log.info(f"Processing event {event} to subscriber...")
self.subscriber.process_event(event)
except Exception as e:
raise SubscriberException(f"Failed to process event {message}. Error: {e}")
self.consumer.ack()
except EventFilterException as e:
Log.error(f"Filter exception {e} {traceback.format_exc()} for {message}")
self.consumer.ack()
except EventParserException as e:
Log.error(f"Parser exception {e} {traceback.format_exc()} for {message}")
self.consumer.ack()
except SubscriberException as e:
Log.error(f"Subscriber exception {e} {traceback.format_exc()} for {message}, retry without ack.")
except Exception as e:
Log.error(f"Unknown Exception caught {e} {traceback.format_exc()}")
Log.error(f"Forcefully ack failed msg: {message}")
self.consumer.ack()
self.consumer.start()
124 changes: 124 additions & 0 deletions ha/test/test_message_bus_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python3

# Copyright (c) 2021 Seagate Technology LLC and/or its Affiliates
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <https://www.gnu.org/licenses/>. For any questions
# about this software or licensing, please email opensource@seagate.com or
# cortx-questions@seagate.com.

import os
import sys
import pathlib
import unittest
import time
import json
sys.path.append(os.path.join(os.path.dirname(pathlib.Path(__file__)), '..', '..', '..'))
from ha.util.message_bus import MessageBus
from ha.util.message_bus import CONSUMER_STATUS

class TestMessageBus(unittest.TestCase):
"""
Integration test TestMessageBus
"""

def setUp(self):
"""
Setup.
"""
self.message_type = "test_ha"
MessageBus.deregister(self.message_type)
# Consumer
self.consumer_id = 1
self.consumer_group = "test_ha_group"
# Producer
producer_id="ha_producer"
self.producer = MessageBus.get_producer(producer_id=producer_id, message_type=self.message_type)
self.status = None
self.stop_thread = False
self.count = 1
self.consumer = MessageBus.get_consumer(consumer_id=self.consumer_id,
consumer_group=self.consumer_group,
message_type=self.message_type,
callback=self._callback)

def _callback(self, message: str):
"""
Callback
"""
payload = json.loads(message)
if self.status == CONSUMER_STATUS.SUCCESS:
print(f"Caught message {payload}")
if payload.get("status") == "stop":
self.stop_thread = True
return CONSUMER_STATUS.SUCCESS_STOP
return CONSUMER_STATUS.SUCCESS
elif self.status == CONSUMER_STATUS.FAILED:
# Receive same message 3 time and then return fail stop
if self.count <= 3 and payload.get("status") == "failed":
print(f"Received same {payload} with count {self.count}")
self.count += 1
return CONSUMER_STATUS.FAILED
elif payload.get("status") == "failed":
self.stop_thread = True
print(f"Received same {payload} with count {self.count} stopping forcefully.")
return CONSUMER_STATUS.FAILED_STOP
else:
print(f"Received same {payload} as no responce")
self.stop_thread = True
return CONSUMER_STATUS.SUCCESS_STOP

def test_message_bus(self):
"""
Test safe close thread by return true
"""
self._check_success_case()
self._check_failed_case()

def _check_success_case(self):
"""
Return SUCCESS and SUCCESS_STOP
"""
# send message
self.producer.publish({"status":"start"})
self.producer.publish({"status":"working"})
self.producer.publish({"status":"stop"})

# Wait for message
self.status = CONSUMER_STATUS.SUCCESS
self.consumer.start()
while not self.stop_thread:
time.sleep(2)
self.consumer.stop()
self.stop_thread = False
print("Successfully tested SUCCESS and SUCCESS_STOP")

def _check_failed_case(self):
self.producer.publish({"status":"failed"})
self.status = CONSUMER_STATUS.FAILED

# Wait for message
self.consumer.start()
while not self.stop_thread:
time.sleep(5)
print("5 sec sleep completed produce one message again")
self.producer.publish({"status":"no_responce_seen"})
self.consumer.stop()
print("Successfully tested SUCCESS and SUCCESS_STOP")

def tearDown(self):
"""
destroy.
"""
MessageBus.deregister(self.message_type)

if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit 3d35726

Please sign in to comment.