Skip to content

Commit

Permalink
feat(referential): group in a struct and add cache for none dynamic c…
Browse files Browse the repository at this point in the history
…onnector.
  • Loading branch information
jmfiaschi committed Jan 16, 2024
1 parent 3032c70 commit 5f19c06
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 160 deletions.
43 changes: 36 additions & 7 deletions src/connector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
pub mod paginator;
pub mod counter;
#[cfg(feature = "curl")]
pub mod authenticator;
#[cfg(feature = "bucket")]
pub mod bucket;
#[cfg(feature = "bucket")]
pub mod bucket_select;
pub mod counter;
#[cfg(feature = "curl")]
pub mod curl;
pub mod in_memory;
pub mod io;
pub mod local;
#[cfg(feature = "mongodb")]
pub mod mongodb;
pub mod paginator;
#[cfg(feature = "psql")]
pub mod psql;

Expand All @@ -29,17 +29,17 @@ use self::local::Local;
use self::mongodb::Mongodb;
#[cfg(feature = "psql")]
use self::psql::Psql;
use crate::DataSet;
use crate::document::Document;
use crate::DataSet;
use crate::DataStream;
use crate::Metadata;
use async_trait::async_trait;
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt;
use std::io::{Error, ErrorKind, Result};
use std::pin::Pin;
use futures::stream::Stream;

#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(tag = "type")]
Expand Down Expand Up @@ -97,6 +97,26 @@ impl ConnectorType {
}
}

impl ConnectorType {
pub fn inner(&self) -> &dyn Connector {
match self {
ConnectorType::InMemory(connector) => connector,
ConnectorType::Io(connector) => connector,
ConnectorType::Local(connector) => connector,
#[cfg(feature = "curl")]
ConnectorType::Curl(connector) => connector,
#[cfg(feature = "bucket")]
ConnectorType::Bucket(connector) => connector,
#[cfg(feature = "bucket")]
ConnectorType::BucketSelect(connector) => connector,
#[cfg(feature = "mongodb")]
ConnectorType::Mongodb(connector) => connector,
#[cfg(feature = "psql")]
ConnectorType::Psql(connector) => connector,
}
}
}

/// Struct that implement this trait can get a reader or writer in order to do something on a document.
#[async_trait]
pub trait Connector: Send + Sync + std::fmt::Debug + ConnectorClone + Unpin {
Expand Down Expand Up @@ -124,13 +144,22 @@ pub trait Connector: Send + Sync + std::fmt::Debug + ConnectorClone + Unpin {
/// Fetch data from the resource and set the inner of the connector.
async fn fetch(&mut self, document: &dyn Document) -> std::io::Result<Option<DataStream>>;
/// Send the data from the inner connector to the remote resource.
async fn send(&mut self, document: &dyn Document, dataset: &DataSet) -> std::io::Result<Option<DataStream>>;
async fn send(
&mut self,
document: &dyn Document,
dataset: &DataSet,
) -> std::io::Result<Option<DataStream>>;
/// Erase the content of the resource.
async fn erase(&mut self) -> Result<()> {
Err(Error::new(ErrorKind::Unsupported, "function not implemented"))
Err(Error::new(
ErrorKind::Unsupported,
"function not implemented",
))
}
/// Paginate through the current connector and return a stream of new connector with new parameters.
async fn paginate(&self) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn Connector>>> + Send>>>;
async fn paginate(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn Connector>>> + Send>>>;
}

impl fmt::Display for dyn Connector {
Expand Down
124 changes: 0 additions & 124 deletions src/helper/mod.rs
Original file line number Diff line number Diff line change
@@ -1,131 +1,7 @@
use crate::{
step::{reader::Reader, receive, Step},
Context,
};
use async_std::task;
use futures::StreamExt;
use serde_json::Value;
use std::{collections::HashMap, io};

pub mod json_pointer;
pub mod mustache;
pub mod string;
pub mod value;

#[cfg(feature = "xml")]
pub mod xml2json;

/// Replace a HashMap of readers by HashMap of Values. Each Value indexed by the referencial name.
///
/// # Examples
///
/// ```no_run
/// use chewdata::step::reader::Reader;
/// use chewdata::connector::in_memory::InMemory;
/// use chewdata::connector::{Connector, ConnectorType};
/// use std::{io, collections::HashMap};
/// use chewdata::helper::referentials_reader_into_value;
/// use serde_json::Value;
/// use chewdata::DataResult;
/// use chewdata::Context;
///
/// #[async_std::main]
/// async fn main() -> io::Result<()> {
/// let referential_1 = Reader {
/// connector_type: ConnectorType::InMemory(InMemory::new(r#"[{"column1":"value1"}]"#)),
/// ..Default::default()
/// };
/// let referential_2 = Reader {
/// connector_type: ConnectorType::InMemory(InMemory::new(r#"[{"column1":"value2"}]"#)),
/// ..Default::default()
/// };
/// let mut referentials = HashMap::default();
/// referentials.insert("ref_1".to_string(), referential_1);
/// referentials.insert("ref_2".to_string(), referential_2);
///
/// let context = Context::new("step_main".to_string(), DataResult::Ok(Value::Null)).unwrap();
///
/// let values = referentials_reader_into_value(&referentials, &context).await?;
/// let values_expected:HashMap<String, Vec<Value>> = serde_json::from_str(r#"{"ref_1":[{"column1":"value1"}],"ref_2":[{"column1":"value2"}]}"#).unwrap();
///
/// assert_eq!(values_expected, values);
///
/// Ok(())
/// }
/// ```
pub async fn referentials_reader_into_value(
referentials: &HashMap<String, Reader>,
context: &Context,
) -> io::Result<HashMap<String, Vec<Value>>> {
let mut referentials_vec = HashMap::new();

for (name, referential) in referentials {
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();

sender_input
.send(context.clone())
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
sender_input.close();

let mut task_referential = referential.clone();
task_referential.name = name.clone();
task_referential.set_receiver(receiver_input.clone());
task_referential.set_sender(sender_output.clone());

task::spawn(async move { task_referential.exec().await }).await?;
sender_output.close();

let values = receive(&receiver_output)
.await?
.map(|context| context.input().to_value())
.collect()
.await;

referentials_vec.insert(name.clone(), values);
}

Ok(referentials_vec)
}

#[cfg(test)]
mod tests {
use crate::{
connector::{in_memory::InMemory, ConnectorType},
DataResult,
};

use super::*;

#[async_std::test]
async fn referentials_reader_into_value() {
let referential_1 = Reader {
connector_type: ConnectorType::InMemory(InMemory::new(
r#"[{"column1":"value1"},{"column1":"value2"}]"#,
)),
..Default::default()
};
let referential_2 = Reader {
connector_type: ConnectorType::InMemory(InMemory::new(
r#"[{"column1":"value3"},{"column1":"value4"}]"#,
)),
..Default::default()
};
let mut referentials = HashMap::default();
referentials.insert("ref_1".to_string(), referential_1);
referentials.insert("ref_2".to_string(), referential_2);

let context = Context::new("step_main".to_string(), DataResult::Ok(Value::Null)).unwrap();

let values = super::referentials_reader_into_value(&referentials, &context)
.await
.unwrap();

let values_expected: HashMap<String, Vec<Value>> = serde_json::from_str(
r#"{"ref_1":[{"column1":"value1"},{"column1":"value2"}],"ref_2":[{"column1":"value3"},{"column1":"value4"}]}"#,
)
.unwrap();
assert_eq!(values_expected, values);
}
}
1 change: 1 addition & 0 deletions src/step/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
pub mod eraser;
pub mod generator;
pub mod reader;
pub mod referential;
pub mod transformer;
pub mod validator;
pub mod writer;
Expand Down
Loading

0 comments on commit 5f19c06

Please sign in to comment.