Skip to content

Commit

Permalink
[C] Basic gossip protocol working. real-logic#886.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeb01 committed Mar 13, 2020
1 parent 3448cab commit 08c8601
Show file tree
Hide file tree
Showing 8 changed files with 526 additions and 96 deletions.
491 changes: 412 additions & 79 deletions aeron-driver/src/main/c/aeron_name_resolver_driver.c

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions aeron-driver/src/main/c/aeron_name_resolver_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
#define AERON_NAME_RESOLVER_DRIVER_H

int aeron_name_resolver_driver_set_resolution_header(
aeron_resolution_header_t *resolution_header,
size_t capacity,
uint8_t flags,
int8_t res_type,
uint8_t *address,
uint16_t port,
const char *name,
size_t name_length);

int aeron_name_resolver_driver_set_resolution_header_from_sockaddr(
aeron_resolution_header_t *resolution_header,
size_t capacity,
uint8_t flags,
Expand Down
7 changes: 3 additions & 4 deletions aeron-driver/src/main/c/aeron_name_resolver_driver_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ int aeron_name_resolver_driver_cache_close(aeron_name_resolver_driver_cache_t *c
return 0;
}


int aeron_name_resolver_driver_cache_find_index_by_name_and_type(
aeron_name_resolver_driver_cache_t *cache,
const char *name,
Expand Down Expand Up @@ -110,14 +109,14 @@ int aeron_name_resolver_driver_cache_add_or_update(
}

entry->port = port;
size_t address_len = res_type == AERON_RES_HEADER_TYPE_NAME_TO_IP6_MD ? 16 : 4;
size_t address_len = aeron_res_header_address_length(res_type);
memcpy(entry->address, address, address_len);
memset(&entry->address[address_len], 0, 16 - address_len);
memset(&entry->address[address_len], 0, AERON_RES_HEADER_ADDRESS_LENGTH_IP6 - address_len);

return num_updated;
}

int aeron_name_resolver_driver_cache_lookup(
int aeron_name_resolver_driver_cache_lookup_by_name(
aeron_name_resolver_driver_cache_t *cache,
const char *name,
size_t name_length,
Expand Down
4 changes: 2 additions & 2 deletions aeron-driver/src/main/c/aeron_name_resolver_driver_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

typedef struct aeron_name_resolver_driver_cache_entry_stct
{
uint8_t address[16];
uint8_t address[AERON_RES_HEADER_ADDRESS_LENGTH_IP6];
int64_t deadline_ms;
int64_t time_of_last_activity_ms;
size_t name_length;
Expand Down Expand Up @@ -52,7 +52,7 @@ int aeron_name_resolver_driver_cache_add_or_update(
uint8_t *address,
uint16_t port);

int aeron_name_resolver_driver_cache_lookup(
int aeron_name_resolver_driver_cache_lookup_by_name(
aeron_name_resolver_driver_cache_t *cache,
const char *name,
size_t name_length,
Expand Down
5 changes: 5 additions & 0 deletions aeron-driver/src/main/c/protocol/aeron_udp_protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <string.h>
#include <netinet/in.h>

#include "util/aeron_bitutil.h"
#include "aeron_udp_protocol.h"

int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *group_tag)
Expand All @@ -40,3 +41,7 @@ int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *gro
return (int)((size_t)sm->frame_header.frame_length - group_tag_offset);
}

extern size_t aeron_res_header_address_length(int8_t res_type);
extern size_t aeron_res_header_entry_length_ipv4(aeron_resolution_header_ipv4_t *header);
extern size_t aeron_res_header_entry_length_ipv6(aeron_resolution_header_ipv6_t *header);

23 changes: 21 additions & 2 deletions aeron-driver/src/main/c/protocol/aeron_udp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ aeron_rttm_header_t;

#pragma pack(pop)

#define AERON_RES_HEADER_ADDRESS_LENGTH_IP4 (4)
#define AERON_RES_HEADER_ADDRESS_LENGTH_IP6 (16)

#pragma pack(push)
#pragma pack(1)
typedef struct aeron_resolution_header_stct
Expand All @@ -111,15 +114,15 @@ aeron_resolution_header_t;
typedef struct aeron_resolution_header_ipv4_stct
{
aeron_resolution_header_t resolution_header;
uint8_t addr[4];
uint8_t addr[AERON_RES_HEADER_ADDRESS_LENGTH_IP4];
int16_t name_length;
}
aeron_resolution_header_ipv4_t;

typedef struct aeron_resolution_header_ipv6_stct
{
aeron_resolution_header_t resolution_header;
uint8_t addr[16];
uint8_t addr[AERON_RES_HEADER_ADDRESS_LENGTH_IP6];
int16_t name_length;
}
aeron_resolution_header_ipv6_t;
Expand Down Expand Up @@ -155,4 +158,20 @@ int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *gro
#define AERON_RES_HEADER_TYPE_NAME_TO_IP6_MD (0x02)
#define AERON_RES_HEADER_SELF_FLAG UINT8_C(0x80)

inline size_t aeron_res_header_address_length(int8_t res_type)
{
return AERON_RES_HEADER_TYPE_NAME_TO_IP6_MD == res_type ?
AERON_RES_HEADER_ADDRESS_LENGTH_IP6 : AERON_RES_HEADER_ADDRESS_LENGTH_IP4;
}

inline size_t aeron_res_header_entry_length_ipv4(aeron_resolution_header_ipv4_t *header)
{
return AERON_ALIGN(sizeof(aeron_resolution_header_ipv4_t) + header->name_length, sizeof(int64_t));
}

inline size_t aeron_res_header_entry_length_ipv6(aeron_resolution_header_ipv6_t *header)
{
return AERON_ALIGN(sizeof(aeron_resolution_header_ipv6_t) + header->name_length, sizeof(int64_t));
}

#endif //AERON_UDP_PROTOCOL_H
3 changes: 2 additions & 1 deletion aeron-driver/src/test/c/aeron_name_resolver_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ TEST_F(NameResolverCacheTest, shouldAddAndLookupEntry)
ASSERT_EQ(
1, aeron_name_resolver_driver_cache_add_or_update(&m_cache, name, strlen(name), res_type, address, port))
<< "Iteration: " << i;
ASSERT_LE(0, aeron_name_resolver_driver_cache_lookup(&m_cache, name, strlen(name), res_type, &cache_entry));
ASSERT_LE(0,
aeron_name_resolver_driver_cache_lookup_by_name(&m_cache, name, strlen(name), res_type, &cache_entry));
ASSERT_EQ(res_type, cache_entry->res_type);
ASSERT_EQ(port, cache_entry->port);
ASSERT_EQ(0, memcmp(address, &cache_entry->address, address_length)) << i;
Expand Down
79 changes: 71 additions & 8 deletions aeron-driver/src/test/c/aeron_name_resolver_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,75 @@ TEST_F(NameResolverTest, shouldSeeNeighborFromBootstrap)

struct sockaddr_storage resolved_address_of_b;
resolved_address_of_b.ss_family = AF_INET;
ASSERT_GE(0, resolver_a.resolve_func(&resolver_a, "B", "endpoint", false, &resolved_address_of_b));
ASSERT_LE(0, resolver_a.resolve_func(&resolver_a, "B", "endpoint", false, &resolved_address_of_b));
ASSERT_EQ(AF_INET, resolved_address_of_b.ss_family);
struct sockaddr_in *in_addr_b = (struct sockaddr_in *)&resolved_address_of_b;
ASSERT_EQ(local_address_b.s_addr, in_addr_b->sin_addr.s_addr);
}

TEST_F(NameResolverTest, shouldSeeNeighborFromGossip)
{
struct in_addr local_address_b;
inet_pton(AF_INET, "127.0.0.1", &local_address_b);

aeron_name_resolver_supplier_func_t supplier_func = aeron_name_resolver_supplier_load(AERON_NAME_RESOLVER_DRIVER);
ASSERT_NE(nullptr, supplier_func);

int64_t timestamp_ms = INTMAX_C(8932472347945);

aeron_driver_context_init(&m_context_a);
aeron_driver_context_init(&m_context_b);
aeron_driver_context_init(&m_context_c);

aeron_clock_update_cached_time(m_context_a->cached_clock, timestamp_ms, timestamp_ms * 1000000);
aeron_clock_update_cached_time(m_context_b->cached_clock, timestamp_ms, timestamp_ms * 1000000);
aeron_clock_update_cached_time(m_context_c->cached_clock, timestamp_ms, timestamp_ms * 1000000);

aeron_driver_context_set_resolver_name(m_context_a, "A");
aeron_driver_context_set_resolver_interface(m_context_a, "0.0.0.0:8050");
ASSERT_EQ(0, supplier_func(m_context_a, &resolver_a, NULL)) << aeron_errmsg();

aeron_driver_context_set_resolver_name(m_context_b, "B");
aeron_driver_context_set_resolver_interface(m_context_b, "0.0.0.0:8051");
aeron_driver_context_set_resolver_bootstrap_neighbor(m_context_b, "localhost:8050");
ASSERT_EQ(0, supplier_func(m_context_b, &resolver_b, NULL));

aeron_driver_context_set_resolver_name(m_context_c, "C");
aeron_driver_context_set_resolver_interface(m_context_c, "0.0.0.0:8052");
aeron_driver_context_set_resolver_bootstrap_neighbor(m_context_c, "localhost:8051");
ASSERT_EQ(0, supplier_func(m_context_c, &resolver_c, NULL));

aeron_clock_update_cached_time(m_context_a->cached_clock, timestamp_ms, timestamp_ms * 1000000);
aeron_clock_update_cached_time(m_context_b->cached_clock, timestamp_ms, timestamp_ms * 1000000);
aeron_clock_update_cached_time(m_context_c->cached_clock, timestamp_ms, timestamp_ms * 1000000);

for (int i = 0; i < 6; i++)
{
timestamp_ms += 1000;

// ASSERT_LT(0, resolver_c.do_work_func(&resolver_c, timestamp_ms));
//
// ASSERT_LT(0, resolver_b.do_work_func(&resolver_b, timestamp_ms));
//
// ASSERT_LT(0, resolver_a.do_work_func(&resolver_a, timestamp_ms));
resolver_a.do_work_func(&resolver_a, timestamp_ms);
resolver_b.do_work_func(&resolver_b, timestamp_ms);
resolver_c.do_work_func(&resolver_c, timestamp_ms);
}

struct sockaddr_storage resolved_address;
resolved_address.ss_family = AF_INET;

ASSERT_LE(0, resolver_a.resolve_func(&resolver_a, "B", "endpoint", false, &resolved_address));
ASSERT_LE(0, resolver_c.resolve_func(&resolver_c, "B", "endpoint", false, &resolved_address));

ASSERT_LE(0, resolver_a.resolve_func(&resolver_a, "C", "endpoint", false, &resolved_address));
ASSERT_LE(0, resolver_b.resolve_func(&resolver_b, "C", "endpoint", false, &resolved_address));

ASSERT_LE(0, resolver_c.resolve_func(&resolver_c, "A", "endpoint", false, &resolved_address));
ASSERT_LE(0, resolver_b.resolve_func(&resolver_b, "A", "endpoint", false, &resolved_address));
}

TEST_F(NameResolverTest, shouldHandleSettingNameOnHeader)
{
uint8_t buffer[1024];
Expand All @@ -178,22 +241,22 @@ TEST_F(NameResolverTest, shouldHandleSettingNameOnHeader)
struct sockaddr_storage address;

address.ss_family = AF_INET6;
ASSERT_EQ(48, aeron_name_resolver_driver_set_resolution_header(
ASSERT_EQ(48, aeron_name_resolver_driver_set_resolution_header_from_sockaddr(
resolution_header, sizeof(buffer), flags, &address, hostname, strlen(hostname)));
ASSERT_EQ(48, aeron_name_resolver_driver_set_resolution_header(
ASSERT_EQ(48, aeron_name_resolver_driver_set_resolution_header_from_sockaddr(
resolution_header, 48, flags, &address, hostname, strlen(hostname)));
ASSERT_EQ(0, aeron_name_resolver_driver_set_resolution_header(
ASSERT_EQ(0, aeron_name_resolver_driver_set_resolution_header_from_sockaddr(
resolution_header, 47, flags, &address, hostname, strlen(hostname)));

address.ss_family = AF_INET;
ASSERT_EQ(40, aeron_name_resolver_driver_set_resolution_header(
ASSERT_EQ(40, aeron_name_resolver_driver_set_resolution_header_from_sockaddr(
resolution_header, sizeof(buffer), flags, &address, hostname, strlen(hostname)));
ASSERT_EQ(40, aeron_name_resolver_driver_set_resolution_header(
ASSERT_EQ(40, aeron_name_resolver_driver_set_resolution_header_from_sockaddr(
resolution_header, 40, flags, &address, hostname, strlen(hostname)));
ASSERT_EQ(0, aeron_name_resolver_driver_set_resolution_header(
ASSERT_EQ(0, aeron_name_resolver_driver_set_resolution_header_from_sockaddr(
resolution_header, 39, flags, &address, hostname, strlen(hostname)));

address.ss_family = AF_UNIX;
ASSERT_EQ(-1, aeron_name_resolver_driver_set_resolution_header(
ASSERT_EQ(-1, aeron_name_resolver_driver_set_resolution_header_from_sockaddr(
resolution_header, sizeof(buffer), flags, &address, hostname, strlen(hostname)));
}

0 comments on commit 08c8601

Please sign in to comment.