This repository has been archived by the owner on Sep 21, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 33
/
materialize.rs
59 lines (47 loc) · 2.26 KB
/
materialize.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use smartnoise_validator::errors::*;
use smartnoise_validator::components::Named;
use crate::NodeArguments;
use smartnoise_validator::base::{Value, ReleaseNode, IndexKey};
use indexmap::IndexMap;
use crate::components::Evaluable;
use smartnoise_validator::{proto};
impl Evaluable for proto::Materialize {
fn evaluate(&self, _privacy_definition: &Option<proto::PrivacyDefinition>, arguments: NodeArguments) -> Result<ReleaseNode> {
let column_names = self.get_names(
arguments.iter().map(|(k, v)| (k.clone(), v)).collect(),
IndexMap::new(), None)?;
// num columns is sufficient shared information to build the dataframes
let num_columns = column_names.len();
let mut response = (0..num_columns)
.map(|_| Vec::new())
.collect::<Vec<Vec<String>>>();
let mut reader = match csv::ReaderBuilder::new()
.has_headers(self.skip_row)
.from_path(self.file_path.clone()) {
Ok(reader) => reader,
Err(_) => return Err("the provided file path could not be found".into())
};
// parse from csv into response
reader.deserialize().try_for_each(|result: std::result::Result<Vec<String>, _>| {
// parse each record into the smartnoise internal format
match result {
Ok(record) => record.into_iter().enumerate()
.filter(|(idx, _)| idx < &num_columns)
.for_each(|(idx, value)| response[idx].push(value)),
Err(e) => return Err(format!("{:?}", e).into())
};
Ok::<_, Error>(())
})?;
let num_nonempty_columns = response.iter()
.filter(|col| !col.is_empty()).count();
if 0 < num_nonempty_columns && num_nonempty_columns < num_columns {
(num_nonempty_columns..num_columns).for_each(|idx|
response[idx] = (0..response[0].len()).map(|_| "".to_string()).collect::<Vec<String>>())
}
Ok(ReleaseNode::new(Value::Dataframe(column_names.into_iter()
.zip(response.into_iter())
.map(|(key, value): (IndexKey, Vec<String>)|
(key, ndarray::Array::from(value).into_dyn().into()))
.collect::<IndexMap<IndexKey, Value>>())))
}
}