Skip to content

Commit

Permalink
Standard Cloud Adapter (#126)
Browse files Browse the repository at this point in the history
Implements a standard cloud adapter using GRPC. This is based on the adapter found in the Ibeji examples repository (https://github.com/eclipse-ibeji/ibeji-example-applications/tree/main/freyja_adapters/cloud/azure_cloud_connector_adapter) and should supersede it.

Fixes #123
  • Loading branch information
wilyle authored Feb 9, 2024
1 parent 9c78d23 commit 2c95bfe
Show file tree
Hide file tree
Showing 18 changed files with 520 additions and 6 deletions.
1 change: 1 addition & 0 deletions .accepted_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ chmod
ci
cli
CloudAdapter
CloudConnector
com
CONFIG
config
Expand Down
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# The default resolver for workspaces is different than for regular packages, so use v2 to avoid warnings
resolver = "2"
members = [
"adapters/cloud/grpc_cloud_adapter",
"adapters/cloud/in_memory_mock_cloud_adapter",
"adapters/data/http_mock_data_adapter",
"adapters/data/in_memory_mock_data_adapter",
Expand All @@ -23,10 +24,12 @@ members = [
"mocks/mock_digital_twin",
"mocks/mock_mapping_service",
"proc_macros",
"proto/cloud_connector",
]

[workspace.dependencies]
# Freyja dependencies
cloud-connector-proto = { path = "proto/cloud_connector" }
freyja-build-common = { path = "build_common" }
freyja-common = { path = "common" }
freyja-contracts = { path = "contracts" }
Expand Down Expand Up @@ -60,6 +63,8 @@ log = "^0.4"
mockall = "0.11.4"
paho-mqtt = "0.12"
proc-macro2 = "1.0.70"
prost = "0.12"
prost-types = "0.12"
quote = "1.0.23"
reqwest = { version = "0.11.23", features = ["json"] }
serde = { version = "1.0.195", features = ["derive"] }
Expand Down
29 changes: 29 additions & 0 deletions adapters/cloud/grpc_cloud_adapter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
# SPDX-License-Identifier: MIT

[package]
name = "grpc-cloud-adapter"
version = "0.1.0"
edition = "2021"
license = "MIT"

[dependencies]
async-trait = { workspace = true }
cloud-connector-proto = { workspace = true }
freyja-build-common = { workspace = true }
freyja-common = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }

[build-dependencies]
freyja-build-common = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
time = { workspace = true }
tokio-stream = { workspace = true }
tower = { workspace = true }
17 changes: 17 additions & 0 deletions adapters/cloud/grpc_cloud_adapter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# gRPC Cloud Adapter

The gRPC Cloud Adapter is intended to function as a "standard cloud adapter", enabling integration with other services that implement the appropriate APIs. This reduces the need for custom adapter implementations and facilitates integration with non-rust solutions for other parts of the vehicle system. This library contains an implementation of the `CloudAdapter` trait from the contracts.

## Contract

This adapter utilizes a gRPC client for the `CloudConnector` service in the [cloud connector v1 protobuf description](../../../interfaces/cloud_connector/v1/cloud_connector.proto). To integrate a cloud connector with this adapter, you will need to implement a gRPC server for this service. Samples can be found in the [Ibeji Example Applications Repository](https://github.com/eclipse-ibeji/ibeji-example-applications/tree/main/cloud_connectors/).

## Config

This adapter supports the following configuration settings:

- `target_uri`: The URI of the server to call.
- `max_retries`: The maximum number of times to retry failed attempts to send data to the server.
- `retry_interval_ms`: The interval between subsequent retry attempts, in milliseconds

This adapter supports [config overrides](../../../docs/tutorials/config-overrides.md). The override filename is `grpc_cloud_adapter_config.json`, and the default config is located at `res/grpc_cloud_adapter_config.default.json`.
11 changes: 11 additions & 0 deletions adapters/cloud/grpc_cloud_adapter/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use freyja_build_common::copy_config;

const CONFIG_FILE_STEM: &str = "grpc_cloud_adapter_config";

fn main() {
copy_config(CONFIG_FILE_STEM);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"target_uri": "http://0.0.0.0:5176",
"max_retries": 20,
"retry_interval_ms": 10000
}
18 changes: 18 additions & 0 deletions adapters/cloud/grpc_cloud_adapter/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use serde::{Deserialize, Serialize};

/// Config for the GRPCCloudAdapter
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
/// The target uri for the gRPC server
pub target_uri: String,

/// Max retries for contacting the server
pub max_retries: u32,

/// Retry interval in milliseconds
pub retry_interval_ms: u64,
}
212 changes: 212 additions & 0 deletions adapters/cloud/grpc_cloud_adapter/src/grpc_cloud_adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use std::str::FromStr;
use std::time::Duration;

use async_trait::async_trait;
use log::debug;
use tonic::transport::Channel;

use cloud_connector_proto::{
prost_types::Timestamp,
v1::{cloud_connector_client::CloudConnectorClient, UpdateDigitalTwinRequestBuilder},
};
use freyja_build_common::config_file_stem;
use freyja_common::{
cloud_adapter::{CloudAdapter, CloudAdapterError, CloudMessageRequest, CloudMessageResponse},
config_utils, out_dir,
retry_utils::execute_with_retry,
};

use crate::config::Config;

/// A "standard" cloud adapter which communicates over gRPC
pub struct GRPCCloudAdapter {
// Adapter config
config: Config,

// The gRPC client
client: CloudConnectorClient<Channel>,
}

#[async_trait]
impl CloudAdapter for GRPCCloudAdapter {
/// Creates a new instance of a CloudAdapter with default settings
fn create_new() -> Result<Self, CloudAdapterError> {
let config: Config = config_utils::read_from_files(
config_file_stem!(),
config_utils::JSON_EXT,
out_dir!(),
CloudAdapterError::io,
CloudAdapterError::deserialize,
)?;

let client = futures::executor::block_on(async {
execute_with_retry(
config.max_retries,
Duration::from_millis(config.retry_interval_ms),
|| CloudConnectorClient::connect(config.target_uri.clone()),
Some("Cloud adapter initial connection".into()),
)
.await
.map_err(CloudAdapterError::communication)
})?;

Ok(Self { config, client })
}

/// Sends the signal to the cloud
///
/// # Arguments
///
/// - `cloud_message`: represents a message to send to the cloud canonical model
async fn send_to_cloud(
&self,
cloud_message: CloudMessageRequest,
) -> Result<CloudMessageResponse, CloudAdapterError> {
debug!("Received a request to send to the cloud");

let timestamp = Timestamp::from_str(cloud_message.signal_timestamp.as_str())
.map_err(CloudAdapterError::deserialize)?;

let request = UpdateDigitalTwinRequestBuilder::new()
.string_value(cloud_message.signal_value)
.timestamp(timestamp)
.metadata(cloud_message.metadata)
.build();

let response = execute_with_retry(
self.config.max_retries,
Duration::from_millis(self.config.retry_interval_ms),
|| async {
let request = tonic::Request::new(request.clone());
self.client
.clone()
.update_digital_twin(request)
.await
.map_err(CloudAdapterError::communication)
},
Some("Cloud adapter request".into()),
)
.await
.map_err(CloudAdapterError::communication)?;

debug!("Cloud adapter response: {response:?}");

Ok(CloudMessageResponse {})
}
}

#[cfg(test)]
mod grpc_cloud_adapter_tests {
use super::*;

/// The tests below uses Unix sockets to create a channel between a gRPC client and a gRPC server.
/// Unix sockets are more ideal than using TCP/IP sockets since Rust tests will run in parallel
/// so you would need to set an arbitrary port per test for TCP/IP sockets.
#[cfg(unix)]
mod unix_tests {
use super::*;

use std::sync::Arc;

use tempfile::TempPath;
use tokio::net::{UnixListener, UnixStream};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::{
transport::{Channel, Endpoint, Server, Uri},
Request, Response, Status,
};
use tower::service_fn;

use cloud_connector_proto::v1::{
cloud_connector_server::{CloudConnector, CloudConnectorServer},
UpdateDigitalTwinRequest, UpdateDigitalTwinResponse,
};

pub struct MockCloudConnector {}

#[tonic::async_trait]
impl CloudConnector for MockCloudConnector {
/// Updates a digital twin instance
///
/// # Arguments
/// - `request`: the request to send
async fn update_digital_twin(
&self,
_request: Request<UpdateDigitalTwinRequest>,
) -> Result<Response<UpdateDigitalTwinResponse>, Status> {
let response = UpdateDigitalTwinResponse {};
Ok(Response::new(response))
}
}

async fn create_test_grpc_client(
bind_path: Arc<TempPath>,
) -> CloudConnectorClient<Channel> {
let channel = Endpoint::try_from("http://URI_IGNORED") // Devskim: ignore DS137138
.unwrap()
.connect_with_connector(service_fn(move |_: Uri| {
let bind_path = bind_path.clone();
async move { UnixStream::connect(bind_path.as_ref()).await }
}))
.await
.unwrap();

CloudConnectorClient::new(channel)
}

async fn run_test_grpc_server(uds_stream: UnixListenerStream) {
let mock_azure_connector = MockCloudConnector {};
Server::builder()
.add_service(CloudConnectorServer::new(mock_azure_connector))
.serve_with_incoming(uds_stream)
.await
.unwrap();
}

#[tokio::test]
async fn send_request_to_provider() {
// Create the Unix Socket
let bind_path = Arc::new(tempfile::NamedTempFile::new().unwrap().into_temp_path());
let uds = match UnixListener::bind(bind_path.as_ref()) {
Ok(unix_listener) => unix_listener,
Err(_) => {
std::fs::remove_file(bind_path.as_ref()).unwrap();
UnixListener::bind(bind_path.as_ref()).unwrap()
}
};
let uds_stream = UnixListenerStream::new(uds);

let request_future = async {
let mut client = create_test_grpc_client(bind_path.clone()).await;

let request = UpdateDigitalTwinRequestBuilder::new()
.string_value("foo".into())
.timestamp_now()
.add_metadata(
"model_id".into(),
"dtmi:sdv:Cloud:Vehicle:Cabin:HVAC:AmbientAirTemperature;1".into(),
)
.add_metadata("instance_id".into(), "hvac".into())
.add_metadata(
"instance_property_path".into(),
"/AmbientAirTemperature".into(),
)
.build();

let request = tonic::Request::new(request);
assert!(client.update_digital_twin(request).await.is_ok())
};

tokio::select! {
_ = run_test_grpc_server(uds_stream) => (),
_ = request_future => ()
}

std::fs::remove_file(bind_path.as_ref()).unwrap();
}
}
}
6 changes: 6 additions & 0 deletions adapters/cloud/grpc_cloud_adapter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

mod config;
pub mod grpc_cloud_adapter;
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ mod in_memory_mock_cloud_adapter_tests {
let cloud_adapter = InMemoryMockCloudAdapter::create_new().unwrap();

let cloud_message = CloudMessageRequest {
cloud_signal: HashMap::new(),
metadata: HashMap::new(),
signal_value: String::from("72"),
signal_timestamp: OffsetDateTime::now_utc().to_string(),
};
Expand Down
Loading

0 comments on commit 2c95bfe

Please sign in to comment.