Skip to content

Commit

Permalink
fix(erase): can clear data in the document before and after a step (#17)
Browse files Browse the repository at this point in the history
fix(erase): can clear data in the document before and after a step
  • Loading branch information
jmfiaschi authored Dec 3, 2021
1 parent 9c87151 commit b638908
Show file tree
Hide file tree
Showing 12 changed files with 456 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chewdata"
version = "1.8.2"
version = "1.8.3"
authors = ["Jean-Marc Fiaschi"]
edition = "2018"
description = "Extract Transform and Load data"
Expand Down
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@

This application is an simple ETL in rust that can be used as a connector between systems

* It handle multiple formats : Json, Jsonl, CSV, Toml, XML, Yaml, Text
* It can read/write data from :
* Mongodb database
* S3/Minio with versionning & select
* Http(s) APIs with some authenicators: Basic, Bearer, Jwt
* Local
* Relational DB like PSQL (`Not Yet`)
* Message broker (`Not Yet`)
| Available | Feature | Values | Description |
| --------- | -------------------- | ----------------------------------------------------------- | ---------------------------------- |
| x | Supported format | `Json` , `Jsonl` , `CSV` , `Toml` , `XML` , `Yaml` , `Text` | Read and Write in these format |
| x | Object Databases | `mongodb` | Read / Write / Clean data |
| - | Relational Databases | `psql` | Read / Write / Clean data |
| x | Bucket | `s3` , `minio` | Read / Write / Clean / Select data |
| x | Curl | `*` | Read / Write / Clean data |
| x | Curl auth | `basic` , `bearer` , `jwt` | Read / Write / Clean data |
| - | Message brocker | `rabbitMQ` , `kafka` | Read / Write / Clean data |
| x | Transform data | [tera template](https://tera.netlify.app/docs) | Transform the data in the fly |

More useful information:

* It need only rustup
* No garbage collector
* Parallel work
* Multi platforms
* Use [tera template](https://tera.netlify.app/docs) in order to configure the actions for the data transformation

the target of this project is to simplify the work of developers and simplify the connection between system.
The work is not finished but I hope it will be useful for you.
Expand Down
99 changes: 99 additions & 0 deletions examples/read_erase-dynamic_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use env_applier::EnvApply;
use std::env;
use std::io;
use tracing_futures::WithSubscriber;
use tracing_subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;

#[async_std::main]
async fn main() -> io::Result<()> {
let (non_blocking, _guard) = tracing_appender::non_blocking(io::stdout());
let subscriber = tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_env_filter(EnvFilter::from_default_env())
.finish();

tracing_subscriber::registry().init();

// init the erase_test file
let config = r#"
[
{"type":"r","conn":{"type":"mem","data":"[{\"id\":1},{\"id\":2},{\"id\":3}]"}},
{
"type": "e",
"connector":{
"type": "local",
"path": "./data/out/erase_test_{{ id }}.json"
}
},
{
"type": "reader",
"connector":{
"type": "local",
"path": "./data/multi_lines.json"
}
},
{
"type": "writer",
"connector":{
"type": "local",
"path": "./data/out/erase_test_{{ id }}.json"
}
}
]
"#;

let config_resolved = env::Vars::apply(config.to_string());
chewdata::exec(serde_json::from_str(config_resolved.as_str())?, None, None)
.with_subscriber(subscriber)
.await?;

let (non_blocking, _guard) = tracing_appender::non_blocking(io::stdout());
let subscriber = tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_env_filter(EnvFilter::from_default_env())
.finish();

// read the file and keep the data in memory, clean the file and rewrite the result
let config = r#"
[
{"type":"r","conn":{"type":"mem","data":"[{\"id\":1},{\"id\":2},{\"id\":3}]"}},
{
"type": "read",
"connector":{
"type": "local",
"path": "./data/out/erase_test_{{ id }}.json"
}
},
{
"type": "e",
"connector":{
"type": "local",
"path": "./data/out/erase_test_{{ id }}.json"
}
},
{
"type": "writer",
"connector":{
"type": "local",
"path": "./data/out/erase_test_{{ id }}.json"
},
"doc": {
"type": "json",
"is_pretty": true
}
},
{
"type": "w"
}
]
"#;

let config_resolved = env::Vars::apply(config.to_string());
chewdata::exec(serde_json::from_str(config_resolved.as_str())?, None, None)
.with_subscriber(subscriber)
.await?;

Ok(())
}
97 changes: 97 additions & 0 deletions examples/read_erase-file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use env_applier::EnvApply;
use std::env;
use std::io;
use tracing_futures::WithSubscriber;
use tracing_subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;

#[async_std::main]
async fn main() -> io::Result<()> {
let (non_blocking, _guard) = tracing_appender::non_blocking(io::stdout());
let subscriber = tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_env_filter(EnvFilter::from_default_env())
.finish();

tracing_subscriber::registry().init();

// init the erase_test file
let config = r#"
[
{
"type": "e",
"connector":{
"type": "local",
"path": "./data/out/erase_test.json"
}
},
{
"type": "reader",
"connector":{
"type": "local",
"path": "./data/multi_lines.json"
}
},
{
"type": "writer",
"connector":{
"type": "local",
"path": "./data/out/erase_test.json"
}
}
]
"#;

let config_resolved = env::Vars::apply(config.to_string());
chewdata::exec(serde_json::from_str(config_resolved.as_str())?, None, None)
.with_subscriber(subscriber)
.await?;

let (non_blocking, _guard) = tracing_appender::non_blocking(io::stdout());
let subscriber = tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_env_filter(EnvFilter::from_default_env())
.finish();

// read the file and keep the data in memory, clean the file and rewrite the result
let config = r#"
[
{
"type": "read",
"connector":{
"type": "local",
"path": "./data/out/erase_test.json"
}
},
{
"type": "e",
"connector":{
"type": "local",
"path": "./data/out/erase_test.json"
}
},
{
"type": "writer",
"connector":{
"type": "local",
"path": "./data/out/erase_test.json"
},
"doc": {
"type": "json",
"is_pretty": true
}
},
{
"type": "w"
}
]
"#;

let config_resolved = env::Vars::apply(config.to_string());
chewdata::exec(serde_json::from_str(config_resolved.as_str())?, None, None)
.with_subscriber(subscriber)
.await?;

Ok(())
}
3 changes: 1 addition & 2 deletions src/connector/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ impl Connector for Local {
.append(false)
.write(true)
.truncate(true)
.open(self.path().as_str())?
.write_all(String::default().as_bytes())?;
.open(self.path().as_str())?;

Ok(())
}
Expand Down
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ impl DataResult {
(DataResult::Ok(_), DataResult::OK) | (DataResult::Err(_), DataResult::ERR)
)
}
pub fn merge(&mut self, data_result: DataResult) {
let new_json_value = data_result.to_json_value();

match self {
DataResult::Ok(value) => {
value.merge(new_json_value);
},
DataResult::Err((value, _e)) => {
value.merge(new_json_value);
},
};
}
}

pub type Dataset = Pin<Box<dyn Stream<Item = DataResult> + Send>>;
40 changes: 26 additions & 14 deletions src/step/eraser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub struct Eraser {
connector_type: ConnectorType,
pub alias: Option<String>,
pub description: Option<String>,
#[serde(alias = "data")]
pub data_type: String,
#[serde(alias = "exclude")]
pub exclude_paths: Vec<String>,
}
Expand All @@ -26,6 +28,7 @@ impl Default for Eraser {
connector_type: ConnectorType::default(),
alias: Some(uuid.to_simple().to_string()),
description: None,
data_type: DataResult::OK.to_string(),
exclude_paths: Vec::default(),
}
}
Expand Down Expand Up @@ -62,9 +65,17 @@ impl Step for Eraser {

match (receiver_option, connector.is_variable()) {
(Some(receiver), true) => {
for data_result in receiver {
let json_value = data_result.to_json_value();
connector.set_parameters(json_value.clone());
for data_result_received in receiver {
if !data_result_received.is_type(self.data_type.as_ref()) {
trace!(
data_type_accepted = self.data_type.to_string().as_str(),
data_result = format!("{:?}", data_result_received).as_str(),
"This step handle only this data type"
);
continue;
}

connector.set_parameters(data_result_received.to_json_value().clone());
let path = connector.path();

if !exclude_paths.contains(&path) {
Expand All @@ -74,22 +85,23 @@ impl Step for Eraser {
}

if let Some(ref sender) = sender_option {
trace!("Send data to the queue");
sender
.send(data_result.clone())
.map_err(|e| io::Error::new(io::ErrorKind::Interrupted, e))?;
self.send(data_result_received, sender)?;
}
}
}
(Some(receiver), false) => {
connector.erase().await?;

for data_result in receiver {
for data_result_received in receiver {
let path = connector.path();

// erase when the step receive the first message
if !exclude_paths.contains(&path) {
connector.erase().await?;

exclude_paths.push(path);
}

if let Some(ref sender) = sender_option {
trace!("Send data to the queue");
sender
.send(data_result.clone())
.map_err(|e| io::Error::new(io::ErrorKind::Interrupted, e))?;
self.send(data_result_received, sender)?;
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/step/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ pub trait Step: Send + Sync + std::fmt::Debug + std::fmt::Display + StepClone {
fn thread_number(&self) -> usize {
1
}
#[instrument]
fn send(&self, data_result: DataResult, pipe: &Sender<DataResult>) -> io::Result<()> {
trace!("Send data to the queue");
pipe.send(data_result)
.map_err(|e| io::Error::new(io::ErrorKind::Interrupted, e))?;

Ok(())
}
}

pub trait StepClone {
Expand Down
Loading

0 comments on commit b638908

Please sign in to comment.