Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
feat(*)!: async_nats 0.30, removed wasmbus_rpc dep
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

moved types from wasmbus_rpc

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

removed extra use statement

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

bumped control interface to nats 0.30

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed Jul 14, 2023
1 parent aadb544 commit 673f8bc
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 17 deletions.
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wasmcloud-control-interface"
version = "0.26.0"
version = "0.27.0"
authors = ["wasmCloud Team"]
edition = "2021"
homepage = "https://wasmcloud.com"
Expand All @@ -13,15 +13,18 @@ keywords = ["webassembly", "wasm", "wasmcloud", "control", "ctl"]
categories = ["wasm", "api-bindings"]

[dependencies]
async-nats = "0.29"
async-nats = "0.30"
data-encoding = "2.3.3"
ring = "0.16.20"
cloudevents-sdk = "0.7.0"
futures = "0.3"
rmp-serde = "1.0.0"
tokio = {version="1.9", features=["time"]}
tokio = { version="1.9", features=["time"] }
serde = { version = "1.0.118", features = ["derive"] }
serde_json = "1.0.60"
tracing = "0.1.37"
tracing-futures = "0.2"
wasmbus-rpc = { version = "0.13", features = [ "otel" ]}
bytes = "1.4.0"
opentelemetry = "0.19.0"
tracing-opentelemetry = "0.19.0"
lazy_static = "1.4.0"
20 changes: 10 additions & 10 deletions src/kv.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::HashMap;

use async_nats::{jetstream::kv::Store, Client};
use futures::TryStreamExt;
use tracing::debug;
use wasmbus_rpc::core::LinkDefinition;

use bytes::Bytes;
use data_encoding::HEXUPPER;
use futures::TryStreamExt;
use ring::digest::{digest, SHA256};
use tracing::debug;

use crate::GetClaimsResponse;
use crate::LinkDefinition;
use crate::LinkDefinitionList;
use crate::Result;

Expand Down Expand Up @@ -65,10 +65,10 @@ pub(crate) async fn get_links(store: &Store) -> Result<LinkDefinitionList> {
pub(crate) async fn put_link(store: &Store, ld: LinkDefinition) -> Result<()> {
let id = ld_hash(&ld);
let key = format!("{}{}", LINKDEF_PREFIX, id);
store
Ok(store
.put(key, serde_json::to_vec(&ld)?.into())
.await
.map(|_| ())
.map(|_| ())?)
}

pub(crate) async fn delete_link(
Expand All @@ -82,10 +82,10 @@ pub(crate) async fn delete_link(
LINKDEF_PREFIX,
ld_hash_raw(actor_id, contract_id, link_name)
);
store.delete(key).await.map(|_| ())
Ok(store.delete(key).await.map(|_| ())?)
}

async fn add_linkdef(links: &mut Vec<LinkDefinition>, data: Option<Vec<u8>>) -> Result<()> {
async fn add_linkdef(links: &mut Vec<LinkDefinition>, data: Option<Bytes>) -> Result<()> {
if let Some(d) = data {
let ld: LinkDefinition = serde_json::from_slice(&d)?;
links.push(ld);
Expand All @@ -94,7 +94,7 @@ async fn add_linkdef(links: &mut Vec<LinkDefinition>, data: Option<Vec<u8>>) ->
Ok(())
}

async fn add_claim(claims: &mut Vec<HashMap<String, String>>, data: Option<Vec<u8>>) -> Result<()> {
async fn add_claim(claims: &mut Vec<HashMap<String, String>>, data: Option<Bytes>) -> Result<()> {
if let Some(d) = data {
let json: HashMap<String, String> = serde_json::from_slice(&d)?;
claims.push(json);
Expand Down Expand Up @@ -124,7 +124,7 @@ pub(crate) fn ld_hash_raw(actor_id: &str, contract_id: &str, link_name: &str) ->
// NOTE: these tests require nats to be running with JS enabled.
#[cfg(test)]
mod test {
use wasmbus_rpc::core::LinkDefinition;
use super::LinkDefinition;

use crate::kv::{delete_link, get_claims, get_kv_store, get_links, ld_hash, put_link};

Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
mod broker;
mod kv;
mod otel;
mod sub_stream;
mod types;

Expand All @@ -19,8 +20,8 @@ use sub_stream::collect_timeout;
use tokio::sync::mpsc::Receiver;
use tracing::{debug, error, instrument, trace};
use tracing_futures::Instrument;
use wasmbus_rpc::core::LinkDefinition;
use wasmbus_rpc::otel::OtelHeaderInjector;

use crate::otel::OtelHeaderInjector;

type Result<T> = ::std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

Expand Down
104 changes: 104 additions & 0 deletions src/otel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// COPIED DIRECTLY FROM https://github.com/wasmCloud/weld/blob/wasmbus-rpc-v0.13.0/rpc-rs/src/otel.rs (minus unused functionality)

//! Contains helpers and code for enabling [OpenTelemetry](https://opentelemetry.io/) tracing for
//! wasmbus-rpc calls. Please note that right now this is only supported for providers. This module
//! is only available with the `otel` feature enabled
use async_nats::header::HeaderMap;
use opentelemetry::{
propagation::{Extractor, Injector, TextMapPropagator},
sdk::propagation::TraceContextPropagator,
};
use tracing::span::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

lazy_static::lazy_static! {
static ref EMPTY_HEADERS: HeaderMap = HeaderMap::default();
}

/// A convenience type that wraps a NATS [`HeaderMap`] and implements the [`Extractor`] trait
#[derive(Debug)]
pub struct OtelHeaderExtractor<'a> {
inner: &'a HeaderMap,
}

impl<'a> Extractor for OtelHeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.inner
.get(key)
.and_then(|s| s.iter().next().map(|s| s.as_str()))
}

fn keys(&self) -> Vec<&str> {
self.inner
.iter()
// The underlying type is a string and this should never fail, but we unwrap to an empty string anyway
.map(|(k, _)| std::str::from_utf8(k.as_ref()).unwrap_or_default())
.collect()
}
}

impl<'a> AsRef<HeaderMap> for OtelHeaderExtractor<'a> {
fn as_ref(&self) -> &'a HeaderMap {
self.inner
}
}

/// A convenience type that wraps a NATS [`HeaderMap`] and implements the [`Injector`] trait
#[derive(Debug, Default)]
pub struct OtelHeaderInjector {
inner: HeaderMap,
}

impl OtelHeaderInjector {
/// Creates a new injector using the given [`HeaderMap`]
pub fn new(headers: HeaderMap) -> Self {
OtelHeaderInjector { inner: headers }
}

/// Convenience constructor that returns a new injector with the current span context already
/// injected into the given header map
pub fn new_with_span(headers: HeaderMap) -> Self {
let mut header_map = Self::new(headers);
header_map.inject_context();
header_map
}

/// Convenience constructor that returns a new injector with the current span context already
/// injected into a default [`HeaderMap`]
pub fn default_with_span() -> Self {
let mut header_map = Self::default();
header_map.inject_context();
header_map
}

/// Injects the current context from the span into the headers
pub fn inject_context(&mut self) {
let ctx_propagator = TraceContextPropagator::new();
ctx_propagator.inject_context(&Span::current().context(), self);
}
}

impl Injector for OtelHeaderInjector {
fn set(&mut self, key: &str, value: String) {
self.inner.insert(key, value.as_ref());
}
}

impl AsRef<HeaderMap> for OtelHeaderInjector {
fn as_ref(&self) -> &HeaderMap {
&self.inner
}
}

impl From<HeaderMap> for OtelHeaderInjector {
fn from(headers: HeaderMap) -> Self {
OtelHeaderInjector::new(headers)
}
}

impl From<OtelHeaderInjector> for HeaderMap {
fn from(inj: OtelHeaderInjector) -> Self {
inj.inner
}
}
32 changes: 31 additions & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub type LabelsMap = std::collections::HashMap<String, String>;
/// A list of link definitions
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct LinkDefinitionList {
pub links: wasmbus_rpc::core::ActorLinks,
pub links: ActorLinks,
}

/// One of a potential list of responses to a provider auction
Expand Down Expand Up @@ -366,3 +366,33 @@ pub struct UpdateActorCommand {
#[serde(default)]
pub new_actor_ref: String,
}

// Below are copied structs to avoid depedency conflicts on wasmbus_rpc

// COPIED FROM https://github.com/wasmCloud/weld/blob/wasmbus-rpc-v0.13.0/rpc-rs/src/wasmbus_core.rs#L1176
/// Settings associated with an actor-provider link
pub type LinkSettings = std::collections::HashMap<String, String>;

// COPIED FROM https://github.com/wasmCloud/weld/blob/wasmbus-rpc-v0.13.0/rpc-rs/src/wasmbus_core.rs#L26
/// List of linked actors for a provider
pub type ActorLinks = Vec<LinkDefinition>;

// COPIED FROM https://github.com/wasmCloud/weld/blob/wasmbus-rpc-v0.13.0/rpc-rs/src/wasmbus_core.rs#L1042
/// Link definition for binding actor to provider
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[non_exhaustive]
pub struct LinkDefinition {
/// actor public key
#[serde(default)]
pub actor_id: String,
/// provider public key
#[serde(default)]
pub provider_id: String,
/// link name
#[serde(default)]
pub link_name: String,
/// contract id
#[serde(default)]
pub contract_id: String,
pub values: LinkSettings,
}

0 comments on commit 673f8bc

Please sign in to comment.