Skip to content

Commit

Permalink
Add Support to Gateways (#81)
Browse files Browse the repository at this point in the history
* add compSettings to pi-sense sample

* upd trx in ci (#76)

* upd trx in ci

* upd trx to 2.2.1

* fix aws connect test

* Feat/aws (#75)

* add aws samples, and retries

* get update shadow

* revisit AWS impl

* review aws sample

* upd aws readme

Co-authored-by: ridomin <ridomin@live.com>

* init WP from shadow

* shadow versioning

* rev v5

* clean warning

* configure retained

* missing retain in wp

* allow crt certs

* retain birth

* add modules support for hub client (#79)

* add modules support for hub client

* add module tests

* upd trx to 2.2.2

Co-authored-by: ridomin <ridomin@live.com>

* Feat/gateway (#80)

* add audience, topic rid as string

* test mm with gw

* fix tests

* review x auth for gw

* clean warnings

* fix err with ToBytes

Co-authored-by: rido-min <rido-min@users.noreply.github.com>
  • Loading branch information
ridomin and rido-min committed Nov 5, 2022
1 parent 87e6a9a commit 0ed6894
Show file tree
Hide file tree
Showing 20 changed files with 82 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:

- name: Process trx reports with default
if: always()
uses: im-open/process-dotnet-test-results@v2.2.1
uses: im-open/process-dotnet-test-results@v2.2.2
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

2 changes: 1 addition & 1 deletion .github/workflows/samples2docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Login to GitHub Container Registry
uses: docker/login-action@v1
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
Expand Down
2 changes: 1 addition & 1 deletion samples/iothub-sample/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
}
},
"ConnectionStrings": {
"cs": "HostName=rido-freetier.azure-devices.net;DeviceId=device23;SharedAccessKey=MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDA="
"cs": "HostName=ridofree.azure-devices.net;DeviceId=leaf02;SharedAccessKey=+icdXc8+XlxlggkOXdpaK7lacuJHOKqdwok8ClkFWFY=;GatewayHostName=localhost;CaFile=c:/certs/localhost/ca.pem"
}
}
2 changes: 1 addition & 1 deletion samples/memmon/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ string RenderData()
StringBuilder sb = new();
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, $"{connectionSettings?.HostName}:{connectionSettings?.TcpPort}");
AppendLineWithPadRight(sb, $"{connectionSettings.ClientId} (Auth:{connectionSettings.Auth}/ TLS:{connectionSettings.UseTls})");
AppendLineWithPadRight(sb, $"{connectionSettings.ClientId} (Auth:{connectionSettings.Auth}/ TLS:{connectionSettings.UseTls}) GW: {connectionSettings.GatewayHostName}");
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "Property", "Value".PadRight(15), "Version"));
AppendLineWithPadRight(sb, string.Format("{0:8} | {1:15} | {2}", "--------", "-----".PadLeft(15, '-'), "------"));
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions samples/mqtt-device/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private async Task<Ack<int>> Property_interval_UpdateHandler(int p)

if (p > 0)
{
client.Property_interval.Value = p;
ack.Description = "desired notification accepted";
ack.Status = 200;
ack.Version = client.Property_interval.Version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public static T FromString<T>(string s) => JsonSerializer.Deserialize<T>(s,
})!;
}

#pragma warning disable CA1822 // Mark members as static
public byte[] ToBytes<T>(T payload, string name = "", int? version = null)
#pragma warning restore CA1822 // Mark members as static
{
if (string.IsNullOrEmpty(name))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace MQTTnet.Extensions.MultiCloud.AzureIoTClient
public class HubMqttClient : IHubMqttClient
{
public IMqttClient Connection { get; set; }

public string InitialState { get; set; } = String.Empty;

private readonly TwinRequestResponseBinder twinOperationsBinder;
Expand Down Expand Up @@ -38,12 +37,17 @@ public Func<JsonNode, GenericPropertyAck> OnPropertyUpdateReceived

public Task<string> GetTwinAsync(CancellationToken cancellationToken = default) => twinOperationsBinder.GetTwinAsync(cancellationToken);
public Task<int> UpdateTwinAsync(object payload, CancellationToken cancellationToken = default) => twinOperationsBinder.UpdateTwinAsync(payload, cancellationToken);
public Task<MqttClientPublishResult> SendTelemetryAsync(object payload, CancellationToken t = default) =>
Connection.PublishBinaryAsync($"devices/{Connection.Options.ClientId}/messages/events/",
public async Task<MqttClientPublishResult> SendTelemetryAsync(object payload, CancellationToken t = default)
{
string clientSegment = Connection.Options.ClientId;
if (clientSegment.Contains("/")) //should be a module
{
clientSegment = clientSegment.Replace("/", "/modules/");
}
return await Connection.PublishBinaryAsync($"devices/{clientSegment}/messages/events/",
new UTF8JsonSerializer().ToBytes(payload),
Protocol.MqttQualityOfServiceLevel.AtLeastOnce,
false, t);


}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ public static class BrokerClientFactory
public static string NuGetPackageVersion => $"{ThisAssembly.NuGetPackageVersion}";

public static ConnectionSettings? ComputedSettings { get; private set; }
public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(string connectinString, bool withBirth = true, CancellationToken cancellationToken = default) =>
public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(string connectinString, bool withBirth = false, CancellationToken cancellationToken = default) =>
await CreateFromConnectionSettingsAsync(new ConnectionSettings(connectinString), withBirth, cancellationToken);

public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(ConnectionSettings cs, bool withBirth = true, CancellationToken cancellationToken = default)
public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(ConnectionSettings cs, bool withBirth = false, CancellationToken cancellationToken = default)
{
MqttClient? mqtt = new MqttFactory().CreateMqttClient(MqttNetTraceLogger.CreateTraceLogger()) as MqttClient;
var connAck = await mqtt!.ConnectAsync(new MqttClientOptionsBuilder()
Expand All @@ -23,8 +23,8 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
{
throw new ApplicationException($"Cannot connect to {cs}");
}
if (withBirth)
{
//if (withBirth)
//{
var birthPayload = new UTF8JsonSerializer().ToBytes(
new BirthConvention.BirthMessage(BirthConvention.ConnectionStatus.online)
{
Expand All @@ -34,12 +34,12 @@ public static async Task<IMqttClient> CreateFromConnectionSettingsAsync(Connecti
var pubAck = await mqtt.PublishBinaryAsync(
BirthConvention.BirthTopic(mqtt.Options.ClientId),
birthPayload,
Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true, cancellationToken);
Protocol.MqttQualityOfServiceLevel.AtLeastOnce, true, cancellationToken); //hack to disable retained in registry
if (pubAck.ReasonCode != MqttClientPublishReasonCode.Success)
{
throw new ApplicationException($"Error publishing Birth {cs}");
}
}
//}
return mqtt;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public async Task SendMessageAsync(Ack<T> payload, CancellationToken cancellatio
var prop = new ReadOnlyProperty<Ack<T>>(_connection, _name)
{
TopicPattern = "device/{clientId}/props/{name}/ack",
WrapMessage = false
WrapMessage = false,
Retain = RetainResponse
};
await prop.SendMessageAsync(payload, cancellationToken);
}
Expand Down
12 changes: 5 additions & 7 deletions src/MQTTnet.Extensions.MultiCloud/Binders/CloudToDeviceBinder.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using MQTTnet.Client;
using MQTTnet.Extensions.MultiCloud.Serializers;
using MQTTnet.Protocol;
using System.Diagnostics;
using System.Text;

namespace MQTTnet.Extensions.MultiCloud.Binders;

Expand All @@ -11,10 +9,10 @@ public abstract class CloudToDeviceBinder<T, TResp> : ICloudToDevice<T, TResp>
private readonly string _name;
private readonly IMqttClient _connection;

protected bool UnwrapRequest = false;
protected bool WrapResponse = false;
protected bool RetainResponse = false;
protected bool CleanRetained = false;
public bool UnwrapRequest = false;
public bool WrapResponse = false;
public bool RetainResponse = false;
public bool CleanRetained = false;


public Func<T, Task<TResp>>? OnMessage { get; set; }
Expand Down Expand Up @@ -44,7 +42,7 @@ public CloudToDeviceBinder(IMqttClient connection, string name, IMessageSerializ
if (resp != null)
{
byte[] responseBytes = serializer.ToBytes(resp, WrapResponse ? _name : string.Empty);
string? resTopic = responseTopicPattern?.Replace("{rid}", tp.Rid.ToString()).Replace("{version}", tp.Version.ToString());
string? resTopic = responseTopicPattern?.Replace("{rid}", tp.Rid!).Replace("{version}", tp.Version.ToString());
_ = connection.PublishAsync(
new MqttApplicationMessageBuilder()
.WithTopic(resTopic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public abstract class DeviceToCloudBinder<T> : IDeviceToCloud<T>
private readonly IMqttClient _connection;
private readonly IMessageSerializer _messageSerializer;

public string TopicPattern = "device/{clientId}/telemetry";
public string TopicPattern = String.Empty;
public bool WrapMessage = false;
protected bool Retain = false;
public bool Retain = false;

public DeviceToCloudBinder(IMqttClient mqttClient, string name) : this(mqttClient, name, new UTF8JsonSerializer()) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

public class TopicParameters
{
public int Rid { get; set; }
public string? Rid { get; set; }
public int Version { get; set; }
}
8 changes: 2 additions & 6 deletions src/MQTTnet.Extensions.MultiCloud/Binders/TopicParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,15 @@ public static TopicParameters ParseTopic(string topic)
{
var segments = topic.Split('/');
int twinVersion = -1;
int rid = -1;
string rid = string.Empty;
if (topic.Contains('?'))
{
var qs = HttpUtility.ParseQueryString(segments[^1]);
if (int.TryParse(qs["$version"], out int v))
{
twinVersion = v;
}

if (int.TryParse(qs["$rid"], out int r))
{
rid = r;
}
rid = Convert.ToString(qs["$rid"])!;
}
return new TopicParameters() { Rid = rid, Version = twinVersion };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public AuthType Auth
public string? CaFile { get; set; }
public bool DisableCrl { get; set; }

public string? GatewayHostName { get; set; }

public ConnectionSettings()
{
SasMinutes = Default_SasMinutes;
Expand All @@ -50,6 +52,7 @@ public ConnectionSettings()
UseTls = Default_UseTls == "true";
DisableCrl = Default_DisableCrl == "true";
CleanSession = Default_CleanSession == "true";
GatewayHostName = string.Empty;
}

public static ConnectionSettings FromConnectionString(string cs) => new(cs);
Expand Down Expand Up @@ -98,6 +101,7 @@ private void ParseConnectionString(string cs)
UseTls = GetStringValue(map, nameof(UseTls), Default_UseTls) == "true";
CaFile = GetStringValue(map, nameof(CaFile));
DisableCrl = GetStringValue(map, nameof(DisableCrl), Default_DisableCrl) == "true";
GatewayHostName = GetStringValue(map, nameof(GatewayHostName));
}

private static void AppendIfNotEmpty(StringBuilder sb, string name, string val)
Expand Down Expand Up @@ -129,6 +133,7 @@ public override string ToString()
AppendIfNotEmpty(result, nameof(ModelId), ModelId!);
AppendIfNotEmpty(result, nameof(ClientId), ClientId!);
AppendIfNotEmpty(result, nameof(Auth), Auth!.ToString());
AppendIfNotEmpty(result, nameof(GatewayHostName), GatewayHostName!.ToString());
result.Remove(result.Length - 1, 1);
return result.ToString();
}
Expand Down
10 changes: 7 additions & 3 deletions src/MQTTnet.Extensions.MultiCloud/Connections/SasAuth.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ internal static string Sign(string requestString, string key)

internal static string CreateSasToken(string resource, string sasKey, int minutes)
{

var expiry = DateTimeOffset.UtcNow.AddMinutes(minutes).ToUnixTimeSeconds().ToString();
var sig = System.Net.WebUtility.UrlEncode(Sign($"{resource}\n{expiry}", sasKey));
return $"SharedAccessSignature sr={resource}&sig={sig}&se={expiry}";
}

internal static (string username, string password) GenerateHubSasCredentials(string hostName, string deviceId, string sasKey, string modelId, int minutes = 60) =>
(GetUserName(hostName, deviceId, modelId), CreateSasToken($"{hostName}/devices/{deviceId}", sasKey, minutes));
internal static (string username, string password) GenerateHubSasCredentials(string hostName, string deviceId, string sasKey, string audience, string modelId, int minutes = 60)
{
string user = GetUserName(hostName, deviceId, modelId);
string pwd = CreateSasToken($"{audience}/devices/{deviceId}", sasKey, minutes);
return (user, pwd);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ public static partial class MqttNetExtensions
{
internal static MqttClientOptionsBuilder WithAzureIoTHubCredentials(this MqttClientOptionsBuilder builder, ConnectionSettings? cs)
{
string? hostName = cs!.HostName!;
if (!string.IsNullOrEmpty(cs.GatewayHostName))
{
hostName = cs.GatewayHostName;
}
if (cs?.Auth == AuthType.Sas)
{
if (string.IsNullOrEmpty(cs.ModuleId))
Expand All @@ -17,7 +22,8 @@ internal static MqttClientOptionsBuilder WithAzureIoTHubCredentials(this MqttCli
{
cs.ClientId = $"{cs.DeviceId}/{cs.ModuleId}";
}
return builder.WithAzureIoTHubCredentialsSas(cs.HostName!, cs.DeviceId!, cs.ModuleId!, cs.SharedAccessKey!, cs.ModelId!, cs.SasMinutes, cs.TcpPort);
builder.WithTlsSettings(cs);
return builder.WithAzureIoTHubCredentialsSas(hostName, cs.DeviceId!, cs.ModuleId!, cs.HostName!, cs.SharedAccessKey!, cs.ModelId!, cs.SasMinutes, cs.TcpPort);
}
else if (cs?.Auth == AuthType.X509)
{
Expand All @@ -34,31 +40,29 @@ internal static MqttClientOptionsBuilder WithAzureIoTHubCredentials(this MqttCli
{
cs.DeviceId = clientId;
}

return builder.WithAzureIoTHubCredentialsX509(cs.HostName!, cert, cs.ModelId!, cs.TcpPort);
builder.WithTlsSettings(cs);
return builder.WithAzureIoTHubCredentialsX509(hostName, cert, cs.ModelId!, cs.TcpPort);
}
else
{
throw new ApplicationException("Auth not supported: " + cs?.Auth);
}
}

public static MqttClientOptionsBuilder WithAzureIoTHubCredentialsSas(this MqttClientOptionsBuilder builder, string hostName, string deviceId, string moduleId, string sasKey, string modelId, int sasMinutes, int tcpPort)
public static MqttClientOptionsBuilder WithAzureIoTHubCredentialsSas(this MqttClientOptionsBuilder builder, string hostName, string deviceId, string moduleId, string audience, string sasKey, string modelId, int sasMinutes, int tcpPort)
{
if (string.IsNullOrEmpty(moduleId))
{
(string username, string password) = SasAuth.GenerateHubSasCredentials(hostName, deviceId, sasKey, modelId, sasMinutes);
(string username, string password) = SasAuth.GenerateHubSasCredentials(hostName, deviceId, sasKey, audience, modelId, sasMinutes);
builder
.WithTcpServer(hostName, tcpPort)
.WithTls()
.WithCredentials(username, password);
}
else
{
(string username, string password) = SasAuth.GenerateHubSasCredentials(hostName, $"{deviceId}/{moduleId}", sasKey, modelId, sasMinutes);
(string username, string password) = SasAuth.GenerateHubSasCredentials(hostName, $"{deviceId}/{moduleId}", sasKey, modelId, audience, sasMinutes);
builder
.WithTcpServer(hostName, tcpPort)
.WithTls()
.WithCredentials(username, password);
}
return builder;
Expand All @@ -70,13 +74,7 @@ public static MqttClientOptionsBuilder WithAzureIoTHubCredentialsX509(this MqttC

builder
.WithTcpServer(hostName, tcpPort)
.WithCredentials(new MqttClientCredentials(SasAuth.GetUserName(hostName, clientId, modelId)))
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
Certificates = new List<X509Certificate> { cert }
});
.WithCredentials(new MqttClientCredentials(SasAuth.GetUserName(hostName, clientId, modelId)));
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static X509Certificate2 Load(string certSettings)
}
store.Close();
}
else if (certSettings.Contains(".pem|")) //mycert.pem|mycert.key
else if (certSettings.Contains(".pem|") || certSettings.Contains(".crt|")) //mycert.pem|mycert.key
{

var segments = certSettings.Split('|');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,16 @@ public async Task SendTelemetry()
Assert.Equal("devices/mock/messages/events/", mqttClient.topicRecceived);
Assert.Equal("{\"temp\":2}", mqttClient.payloadReceived);
}

[Fact]
public async Task SendTelemetryToModule()
{
var mqttClient = new MockMqttClient("mock/myModule");
var hubMqttClient = new HubMqttClient(mqttClient);
//var telemetryBinder = new Telemetry<int>(mqttClient, "temp");
await hubMqttClient.SendTelemetryAsync(new { temp = 2});
Assert.Equal("devices/mock/modules/myModule/messages/events/", mqttClient.topicRecceived);
Assert.Equal("{\"temp\":2}", mqttClient.payloadReceived);
}
}
}
13 changes: 11 additions & 2 deletions tests/MQTTnet.Extensions.MultiCloud.UnitTests/MockMqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,28 @@

namespace MQTTnet.Extensions.MultiCloud.UnitTests
{

internal class MockMqttClient : IMqttClient
{
readonly string _clientId = "";


#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
public MockMqttClient()
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
{
_clientId = "mock";
}

public MockMqttClient(string clientId)
{
_clientId = clientId;
}
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.

public bool IsConnected => throw new NotImplementedException();


public MqttClientOptions Options => new() { ClientId = "mock" };
public MqttClientOptions Options => new() { ClientId = _clientId };

public string payloadReceived;
public string topicRecceived;
Expand Down

0 comments on commit 0ed6894

Please sign in to comment.