Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNMY] Reworking #19

Open
wants to merge 25 commits into
base: release/IPM-2.6.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
65859c2
Reworking (in progress)
ericclappier-eaton Jul 6, 2022
fe62dfc
Add wrapper for promise/future
ericclappier-eaton Jul 13, 2022
deac201
Fix compilation error
ericclappier-eaton Jul 13, 2022
3aee400
Clean code
ericclappier-eaton Jul 13, 2022
15da1f3
Clean code (suite)
ericclappier-eaton Jul 13, 2022
54f7f45
Use common Promise for request
ericclappier-eaton Jul 13, 2022
4d64221
Fix compilation issue
ericclappier-eaton Jul 13, 2022
754a53f
Take into account PR remarks
ericclappier-eaton Jul 18, 2022
60c56dd
Secure message bus client pointer
ericclappier-eaton Jul 18, 2022
f9b6173
Activate tests in jenkins
ericclappier-eaton Jul 19, 2022
f2d28df
Secure promise already satisfied
ericclappier-eaton Jul 19, 2022
7fbbcdd
Secure promise already satisfied (suite)
ericclappier-eaton Jul 19, 2022
91e19b2
Workaround for passing tests (TBD)
ericclappier-eaton Jul 20, 2022
f0ff641
amqp: Reduce number of threads for test in multi synch/asynch
ericclappier-eaton Jul 21, 2022
ba5ae6c
Add protected for member in Promise
ericclappier-eaton Jul 21, 2022
c883f78
Update changelog and readme
ericclappier-eaton Jul 25, 2022
72f3b7f
Fix issue with close receiver
ericclappier-eaton Jul 28, 2022
3eb279e
Update filter
ericclappier-eaton Aug 2, 2022
3ac87ba
Change filter format
ericclappier-eaton Aug 3, 2022
e93d443
Change pipe for filter
ericclappier-eaton Aug 3, 2022
4aec842
Fix connection error
ericclappier-eaton Aug 16, 2022
ddcfe7c
Clean code
ericclappier-eaton Aug 16, 2022
df9651f
Update comment
ericclappier-eaton Aug 16, 2022
8028c5f
Merge pull request #1 from eric3873/filter_change
perrettecl Aug 16, 2022
21ac0f1
Move address filter function in utilities
ericclappier-eaton Sep 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,26 @@ All notable changes to 'fty-messagebus2' project will be documented in this file
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

[1.0.0] - 2021-mm-dd
[1.0.X] - 2022-mm-dd [IN PROGRESS]
### Added
-
- Add secured class for promise/future management.
### Changed
-
- Reworking Amqp code.
### Removed
-
- Remove the intermediate class MsgBusAmqp.
### Fixed

[1.0.1] - 2022-07-13
### Added
- Add callback for definitive communication lost treatment.
### Changed
- Reworking Amqp code.
- Optimise the number of simultaneous connection on the Amqp library.
- Modify name space of the project.
### Fixed
- Close missing open handles.
- Modify default configuration to avoid loss of connection.

[1.0.0] - 2022-01-20
### Added
- First version
10 changes: 6 additions & 4 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

@Library('etn-ipm2-jenkins') _

import params.CmakePipelineParams
CmakePipelineParams parameters = new CmakePipelineParams()
parameters.enableDebugBuild = false
//import params.CmakePipelineParams
//CmakePipelineParams parameters = new CmakePipelineParams()
//parameters.enableDebugBuild = false
//parameters.debugBuildRunCoverage = true
//etn_ipm2_build_and_tests_pipeline_cmake(parameters)

etn_ipm2_build_and_tests_pipeline_cmake(parameters)
// run with default parameters
etn_ipm2_build_and_tests_pipeline_cmake()
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
## Description

This project aims to provide somme common methods to address communication over several message bus.
It provide an high level interface to handle comunication of Message. The format of the Message also defined in this project.
It provide an high level interface to handle communication of Message. The format of the Message also defined in this project.

It comes today with 2 implementations:

* MQTT
* AMQP

Those 2 implementations are implementing the fty-commom-messagebus2 interface and are carring Message.
Those 2 implementations are implementing the fty-messagebus2 interface and are carrying Message.

## Interface & Message

Expand Down
37 changes: 19 additions & 18 deletions amqp/public_include/fty/messagebus2/amqp/MessageBusAmqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,41 @@ namespace fty::messagebus2::amqp {

// Default amqp end point
static auto constexpr DEFAULT_ENDPOINT{"amqp://127.0.0.1:5672"};

static auto constexpr BUS_IDENTITY{"AMQP"};
static const std::string TOPIC_PREFIX = "topic://";
static const std::string QUEUE_PREFIX = "queue://";

class MsgBusAmqp;
class AmqpClient;

class MessageBusAmqp final : public MessageBus
{
public:
MessageBusAmqp(const ClientName& clientName = utils::getClientId("MessageBusAmqp"), const Endpoint& endpoint = DEFAULT_ENDPOINT);

MessageBusAmqp(const std::string& clientName = utils::getClientId("MessageBusAmqp"), const Endpoint& endpoint = DEFAULT_ENDPOINT);
~MessageBusAmqp() = default;

MessageBusAmqp(MessageBusAmqp&&) = delete;
MessageBusAmqp& operator=(MessageBusAmqp&&) = delete;
MessageBusAmqp(const MessageBusAmqp&) = delete;
MessageBusAmqp& operator=(const MessageBusAmqp&) = delete;
MessageBusAmqp& operator = (MessageBusAmqp&&) = delete;
MessageBusAmqp(const MessageBusAmqp&) = delete;
MessageBusAmqp& operator = (const MessageBusAmqp&) = delete;

[[nodiscard]] fty::Expected<void, ComState> connect() noexcept override;
[[nodiscard]] fty::Expected<void, DeliveryState> send(const Message& msg) noexcept override;
[[nodiscard]] fty::Expected<void, DeliveryState> receive(
const Address& address, MessageListener&& func, const std::string& filter = {}) noexcept override;
[[nodiscard]] fty::Expected<void, DeliveryState> unreceive(const Address& address) noexcept override;
[[nodiscard]] fty::Expected<Message, DeliveryState> request(const Message& msg, int timeOut) noexcept override;

void setConnectionErrorListener(ConnectionErrorListener errorListener);

const Address& address, MessageListener&& messageListener, const std::string& filter = {}) noexcept override;
[[nodiscard]] fty::Expected<void, DeliveryState> unreceive(const Address& address, const std::string& filter = {}) noexcept override;
// Sync request with timeout
[[nodiscard]] fty::Expected<Message, DeliveryState> request(const Message& message, int timeoutInSeconds) noexcept override;
[[nodiscard]] const ClientName& clientName() const noexcept override;
[[nodiscard]] const Identity& identity() const noexcept override;
[[nodiscard]] const Identity& identity() const noexcept override;

using MessageBus::receive;
private:
std::shared_ptr<MsgBusAmqp> m_busAmqp;
// Test if the service is available or not
bool isServiceAvailable();

// Client name
std::string m_clientName{};
// Amqp endpoint
Endpoint m_endpoint{};
// AmqpClient instance
std::shared_ptr<AmqpClient> m_clientPtr;
};

} // namespace fty::messagebus2::amqp
Loading