Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking ACKs for published events; Futures API #1144

Merged
merged 22 commits into from
Nov 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7b9fba8
Wiring library: Vector class, JSON tools; unit test helper tools
sergeuz Sep 20, 2016
e992827
Make compile-dependency on strtol() and strtod() optional
sergeuz Sep 29, 2016
b1d7d74
Minor fix
sergeuz Oct 16, 2016
70034b7
System error codes and C++ wrapper class
sergeuz Oct 16, 2016
809aed5
Generic completion callback and C++ wrapper classes
sergeuz Oct 16, 2016
08674af
Lock-free future implementation; basic unit test
sergeuz Oct 16, 2016
176c26a
Particle.publish() now returns Future object
sergeuz Oct 16, 2016
35b944a
Unit test build fixes
sergeuz Oct 16, 2016
cc9c72e
Added system function that invokes a callback in the application context
sergeuz Oct 16, 2016
1301d35
Moved common functionality to base class
sergeuz Oct 21, 2016
d26ab60
Add REQUIRE_ACK flag for Particle.publish()
sergeuz Oct 21, 2016
e357f68
More error codes
sergeuz Oct 21, 2016
1d7838f
Completion handling for published events (Particle protocol)
sergeuz Oct 21, 2016
47a480b
Minor fixes and renamings
sergeuz Oct 21, 2016
33d3266
Few more test cases
sergeuz Oct 21, 2016
554c5ef
NONE is too generic name to define it globally
sergeuz Oct 22, 2016
2d20905
It's an error if a completion handler wasn't invoked during its lifet…
sergeuz Oct 22, 2016
d659801
Increased ACK timeout; minor fixes
sergeuz Oct 23, 2016
b3daf7a
Invoke callbacks after settings completion flag
sergeuz Oct 30, 2016
890999b
More unit tests
sergeuz Nov 6, 2016
8893ddd
Merge branch 'feature/usb_logging_1_of_2' into feature/publish_ack
technobly Nov 22, 2016
d8a4054
resolves dynalib index conflict
technobly Nov 22, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion communication/makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ TARGET_TYPE = a

BUILD_PATH_EXT=$(COMMUNICATION_BUILD_PATH_EXT)

DEPENDENCIES = hal dynalib services
DEPENDENCIES = hal dynalib services wiring

include ../build/arm-tlm.mk
2 changes: 2 additions & 0 deletions communication/src/build.mk
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ CPPSRC += $(TARGET_SRC_PATH)/protocol.cpp
CPPSRC += $(TARGET_SRC_PATH)/messages.cpp
CPPSRC += $(TARGET_SRC_PATH)/chunked_transfer.cpp
CPPSRC += $(TARGET_SRC_PATH)/coap_channel.cpp
CPPSRC += $(TARGET_SRC_PATH)/publisher.cpp
CPPSRC += $(TARGET_SRC_PATH)/protocol_defs.cpp

# ASM source files included in this build.
ASRC +=
Expand Down
2 changes: 2 additions & 0 deletions communication/src/coap_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ inline bool time_has_passed(system_tick_t now, system_tick_t tick)
*/
class CoAPMessageStore
{
LOG_CATEGORY("comm.coap");

/**
* The head of the list of messages.
*/
Expand Down
9 changes: 6 additions & 3 deletions communication/src/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ namespace EventType {
enum Flags {
EMPTY_FLAGS = 0,
NO_ACK = 0x2,
WITH_ACK = 0x8,

ALL_FLAGS = NO_ACK
ALL_FLAGS = NO_ACK | WITH_ACK
};

static_assert((PUBLIC & NO_ACK)==0, "flags should be distinct from event type");
static_assert((PRIVATE & NO_ACK)==0, "flags should be distinct from event type");
static_assert((PUBLIC & NO_ACK)==0 &&
(PRIVATE & NO_ACK)==0 &&
(PUBLIC & WITH_ACK)==0 &&
(PRIVATE & WITH_ACK)==0, "flags should be distinct from event type");

/**
* The flags are encoded in with the event type.
Expand Down
5 changes: 5 additions & 0 deletions communication/src/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ ProtocolError Protocol::handle_received_message(Message& message,
break;

case CoAPMessageType::EMPTY_ACK:
ack_handlers.setResult(msg_id);
break;

case CoAPMessageType::ERROR:
default:
; // drop it on the floor
Expand Down Expand Up @@ -243,6 +246,7 @@ int Protocol::begin()
{
chunkedTransfer.reset();
pinger.reset();
ack_handlers.clear(); // FIXME: Cancel pending handlers right after previous session has ended

uint32_t channel_flags = 0;
ProtocolError error = channel.establish(channel_flags, application_state_checksum());
Expand Down Expand Up @@ -351,6 +355,7 @@ ProtocolError Protocol::event_loop(CoAPMessageType::Enum message_type,
*/
ProtocolError Protocol::event_loop(CoAPMessageType::Enum& message_type)
{
ack_handlers.processTimeouts(); // Process expired handlers
Message message;
message_type = CoAPMessageType::NONE;
ProtocolError error = channel.receive(message);
Expand Down
28 changes: 24 additions & 4 deletions communication/src/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ class Protocol
*/
Publisher publisher;

/**
* Completion handlers for messages with confirmable delivery.
*/
CompletionHandlerMap<message_id_t> ack_handlers;

/**
* The token ID for the next request made.
* If we have a bone-fide CoAP layer this will eventually disappear into that layer, just like message-id has.
Expand Down Expand Up @@ -259,7 +264,10 @@ class Protocol
public:
Protocol(MessageChannel& channel) :
channel(channel),
product_id(PRODUCT_ID), product_firmware_version(PRODUCT_FIRMWARE_VERSION), initialized(false)
product_id(PRODUCT_ID),
product_firmware_version(PRODUCT_FIRMWARE_VERSION),
publisher(this),
initialized(false)
{
}

Expand All @@ -283,6 +291,11 @@ class Protocol
copy_and_init(&this->handlers, sizeof(this->handlers), &handlers, handlers.size);
}

void add_ack_handler(message_id_t msg_id, CompletionHandler handler, unsigned timeout)
{
ack_handlers.add(msg_id, std::move(handler), timeout);
}

/**
* Determines the checksum of the application state.
* Application state comprises cloud functinos, variables and subscriptions.
Expand Down Expand Up @@ -325,14 +338,21 @@ class Protocol

// Returns true on success, false on sending timeout or rate-limiting failure
bool send_event(const char *event_name, const char *data, int ttl,
EventType::Enum event_type, int flags)
EventType::Enum event_type, int flags, CompletionHandler handler)
{
if (chunkedTransfer.is_updating())
{
handler.setError(SYSTEM_ERROR_BUSY);
return false;
}
return !publisher.send_event(channel, event_name, data, ttl, event_type, flags,
callbacks.millis());
const ProtocolError error = publisher.send_event(channel, event_name, data, ttl, event_type, flags,
callbacks.millis(), std::move(handler));
if (error != NO_ERROR)
{
handler.setError(toSystemError(error));
return false;
}
return true;
}

inline bool send_subscription(const char *event_name, const char *device_id)
Expand Down
40 changes: 40 additions & 0 deletions communication/src/protocol_defs.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2016 Particle Industries, Inc. All rights reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/

#include "protocol_defs.h"

system_error particle::protocol::toSystemError(ProtocolError error) {
switch (error) {
case NO_ERROR:
return SYSTEM_ERROR_NONE;
case PING_TIMEOUT:
case MESSAGE_TIMEOUT:
return SYSTEM_ERROR_TIMEOUT;
case IO_ERROR:
return SYSTEM_ERROR_IO;
case INVALID_STATE:
return SYSTEM_ERROR_INVALID_STATE;
case AUTHENTICATION_ERROR:
return SYSTEM_ERROR_NOT_ALLOWED;
case BANDWIDTH_EXCEEDED:
return SYSTEM_ERROR_LIMIT_EXCEEDED;
case INSUFFICIENT_STORAGE:
return SYSTEM_ERROR_TOO_LARGE;
default:
return SYSTEM_ERROR_PROTOCOL; // Generic protocol error
}
}
8 changes: 8 additions & 0 deletions communication/src/protocol_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <functional>
#include "system_tick_hal.h"

#include "system_error.h"

typedef uint16_t product_id_t;
typedef uint16_t product_firmware_version_t;

Expand Down Expand Up @@ -35,6 +37,9 @@ enum ProtocolError
UNKNOWN = 0x7FFFF
};

// Converts protocol error to system error code
system_error toSystemError(ProtocolError error);

typedef uint16_t chunk_index_t;

const chunk_index_t NO_CHUNKS_MISSING = 65535;
Expand All @@ -47,6 +52,9 @@ const size_t MAX_EVENT_NAME_LENGTH = 64;
const size_t MAX_EVENT_DATA_LENGTH = 64;
const size_t MAX_EVENT_TTL_SECONDS = 16777215;

// Timeout in milliseconds given to receive an acknowledgement for a published event
const unsigned SEND_EVENT_ACK_TIMEOUT = 20000;

#ifndef PROTOCOL_BUFFER_SIZE
#if PLATFORM_ID<2
#define PROTOCOL_BUFFER_SIZE 640
Expand Down
24 changes: 24 additions & 0 deletions communication/src/publisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2016 Particle Industries, Inc. All rights reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/

#include "publisher.h"

#include "protocol.h"

void particle::protocol::Publisher::add_ack_handler(message_id_t msg_id, CompletionHandler handler) {
protocol->add_ack_handler(msg_id, std::move(handler), SEND_EVENT_ACK_TIMEOUT);
}
43 changes: 34 additions & 9 deletions communication/src/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@

#pragma once

#include "protocol_defs.h"
#include "events.h"
#include "message_channel.h"
#include "messages.h"

#include "completion_handler.h"

namespace particle
{
namespace protocol
{

#include "protocol_defs.h"
#include "events.h"
#include "message_channel.h"
class Protocol;

class Publisher
{
public:
explicit Publisher(Protocol* protocol) :
protocol(protocol)
{
}

inline bool is_system(const char* event_name)
{
Expand Down Expand Up @@ -83,11 +92,9 @@ class Publisher
return false;
}

public:

ProtocolError send_event(MessageChannel& channel, const char* event_name,
const char* data, int ttl, EventType::Enum event_type, int flags,
system_tick_t time)
system_tick_t time, CompletionHandler handler)
{
bool is_system_event = is_system(event_name);
bool rate_limited = is_rate_limited(is_system_event, time);
Expand All @@ -96,13 +103,31 @@ class Publisher

Message message;
channel.create(message);
bool noack = flags & EventType::NO_ACK;
bool confirmable = channel.is_unreliable() && !noack;
bool confirmable = channel.is_unreliable();
if (flags & EventType::NO_ACK) {
confirmable = false;
} else if (flags & EventType::WITH_ACK) {
confirmable = true;
}
size_t msglen = Messages::event(message.buf(), 0, event_name, data, ttl,
event_type, confirmable);
message.set_length(msglen);
return channel.send(message);
const ProtocolError result = channel.send(message);
if (result == NO_ERROR) {
// Register completion handler only if acknowledgement was requested explicitly
if ((flags & EventType::WITH_ACK) && message.has_id()) {
add_ack_handler(message.get_id(), std::move(handler));
} else {
handler.setResult();
}
}
return result;
}

private:
Protocol* protocol;

void add_ack_handler(message_id_t msg_id, CompletionHandler handler);
};

}}
Loading