Skip to content

Commit

Permalink
initial structure change
Browse files Browse the repository at this point in the history
  • Loading branch information
jjcfrancisco committed Aug 7, 2024
1 parent 4536279 commit 164cdd2
Show file tree
Hide file tree
Showing 12 changed files with 1,517 additions and 181 deletions.
1,303 changes: 1,269 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,10 @@ bytes = "1.6.0"
geojson = "0.24.1"
serde_json = "1.0.120"
derive_more = "0.99.18"
arrow = "52.2.0"
tokio = { version = "1.39.2", features = ["fs", "full"] }
parquet = { version = "52.2.0", features = ["async", "snap"] }
geoarrow = { version = "0.2.0", features = ["parquet_async", "parquet_compression"] }
futures = "0.3.30"
geo-types = "0.7.13"

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ The list below contains the upcoming implementations.
* Reduce precision of a GeoJSON file.
* New validate command to validate files.
* Merge two columns of different types.
* Reading of Shapefile & GeoJSON in one pass.


## License
Expand Down
Binary file added examples/geoparquet/example.parquet
Binary file not shown.
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub enum Error {
Pg(postgres::Error),
#[from]
Shapefile(shapefile::Error),
#[from]
GeoArrow(geoarrow::error::GeoArrowError),
}

// region: --- Error Boilerplate
Expand Down
5 changes: 5 additions & 0 deletions src/file_types/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum AcceptedTypes {
pub enum FileType {
Shapefile,
GeoJson,
GeoParquet,
}

pub fn determine_file_type(input_file: &str) -> Result<FileType> {
Expand All @@ -67,6 +68,8 @@ pub fn determine_file_type(input_file: &str) -> Result<FileType> {
match file_extension_str {
"shp" => Ok(FileType::Shapefile),
"geojson" => Ok(FileType::GeoJson),
"parquet" => Ok(FileType::GeoParquet),
"geoparquet" => Ok(FileType::GeoParquet),
_ => Err(Error::UnsupportedFileExtension("Unsupported file type ✘".into())),
}
}
Expand All @@ -79,7 +82,9 @@ mod tests {
fn test_determine_file_type() {
let shapefile = "examples/shapefile/andalucia.shp";
let geojson = "examples/geojson/spain.geojson";
let geoparquet = "examples/geoparquet/example.parquet";
assert_eq!(determine_file_type(shapefile).unwrap(), FileType::Shapefile);
assert_eq!(determine_file_type(geojson).unwrap(), FileType::GeoJson);
assert_eq!(determine_file_type(geoparquet).unwrap(), FileType::GeoParquet);
}
}
145 changes: 83 additions & 62 deletions src/file_types/geojson.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,82 @@
use crate::{Result, Error};
use crate::{Error, Result};

use std::collections::HashMap;
use crate::pg::binary_copy::Wkb;
use geojson::GeoJson;
use postgres::types::Type;
use serde_json;
use std::collections::HashMap;
use wkb::geom_to_wkb;

use crate::file_types::common::{AcceptedTypes, Row, Rows};
use crate::pg::binary_copy::Wkb;
use crate::pg::binary_copy::{infer_geom_type, insert_row};
use crate::pg::crud::{get_stmt, prepare_postgis};
use crate::utils::cli::Cli;

use super::common::{AcceptedTypes, NewTableTypes};

pub fn insert_data(args: Cli) -> Result<()> {
// Reads through the geojson file and determines the data types
// Fix - it should only read one time
//
// Example:
//
// let data_types, geojson_str = read_geojson(&args.uri)?;
let data_types = determine_data_types(&args.input)?;

// Creates the necessary schema and table in PostGIS
prepare_postgis(&args, &data_types)?;

// Infer the geometry type
let stmt = get_stmt(&args.table, &args.schema, &args.uri)?;
let geom_type = infer_geom_type(stmt)?;

// Prepare types for binary copy
// This is unnecessary -> refactor soon
let mut types: Vec<Type> = Vec::new();
for column in data_types.iter() {
types.push(column.data_type.clone());
}
types.push(geom_type);

let geojson = read_geojson(&args.input)?;
match geojson {
GeoJson::FeatureCollection(fc) => {
let features = fc.features;
println!("Inserting data into database...");
for feature in features {
let mut row: Vec<AcceptedTypes> = Vec::new();
for (_, value) in feature.properties.unwrap().into_iter() {
match value {
serde_json::Value::Number(num) => {
row.push(AcceptedTypes::Float(num.as_f64()));
}
serde_json::Value::String(string) => {
row.push(AcceptedTypes::Text(Some(string)));
}
serde_json::Value::Bool(boolean) => {
row.push(AcceptedTypes::Bool(Some(boolean)));
}
serde_json::Value::Null => {
row.push(AcceptedTypes::Text(None));
}
_ => println!("Type currently not supported ✘"),
}
}
let gj_geom = feature.geometry.unwrap();
let geom: geo::Geometry<f64> = gj_geom
.value
.try_into()
.expect("Failed to convert geojson::Geometry to geo::Geometry ✘");
let wkb = geom_to_wkb(&geom).expect("Could not convert geometry to WKB ✘");
row.push(AcceptedTypes::Geometry(Some(Wkb { geometry: wkb })));
insert_row(row, &data_types, &types, &args)?;
}
println!("Data sucessfully inserted into database ✓");
}
_ => println!("Not a feature collection ✘"),
}

use super::common::NewTableTypes;
Ok(())
}

pub fn determine_data_types(file_path: &str) -> Result<Vec<NewTableTypes>> {
let mut table_config: HashMap<String, Type> = HashMap::new();
Expand All @@ -19,11 +86,6 @@ pub fn determine_data_types(file_path: &str) -> Result<Vec<NewTableTypes>> {
match geojson {
GeoJson::FeatureCollection(fc) => {
let features = fc.features;
// Id not used
// table_config.push(NewTableTypes {
// column_name: "id".to_string(),
// data_type: Type::INT8,
// });
if !features.is_empty() {
let properties = features.first();
if properties.is_some() {
Expand All @@ -41,7 +103,9 @@ pub fn determine_data_types(file_path: &str) -> Result<Vec<NewTableTypes>> {
} else if table_config.contains_key(&key)
&& table_config[&key] != Type::INT8
{
return Err(Error::MixedDataTypes("Column contains mixed data types ✘".to_string()));
return Err(Error::MixedDataTypes(
"Column contains mixed data types ✘".to_string(),
));
} else {
table_config.insert(key, Type::FLOAT8);
}
Expand All @@ -54,7 +118,9 @@ pub fn determine_data_types(file_path: &str) -> Result<Vec<NewTableTypes>> {
} else if table_config.contains_key(&key)
&& table_config[&key] != Type::INT8
{
return Err(Error::MixedDataTypes("Column contains mixed data types ✘".to_string()));
return Err(Error::MixedDataTypes(
"Column contains mixed data types ✘".to_string(),
));
} else {
table_config.insert(key, Type::TEXT);
}
Expand All @@ -67,7 +133,9 @@ pub fn determine_data_types(file_path: &str) -> Result<Vec<NewTableTypes>> {
} else if table_config.contains_key(&key)
&& table_config[&key] != Type::INT8
{
return Err(Error::MixedDataTypes("Column contains mixed data types ✘".to_string()));
return Err(Error::MixedDataTypes(
"Column contains mixed data types ✘".to_string(),
));
} else {
table_config.insert(key, Type::BOOL);
}
Expand All @@ -94,57 +162,10 @@ pub fn determine_data_types(file_path: &str) -> Result<Vec<NewTableTypes>> {
Ok(data_types)
}

pub fn read_geojson(file_path: &str) -> Result<Rows> {
let mut rows = Rows::new();
pub fn read_geojson(file_path: &str) -> Result<GeoJson> {
let geojson_str = std::fs::read_to_string(file_path)?;
let geojson = geojson_str.parse::<GeoJson>().unwrap();

match geojson {
GeoJson::FeatureCollection(fc) => {
let features = fc.features;
for feature in features {
let mut row = Row::new();
// Id not used
// let id = feature.id.unwrap();
// match id {
// geojson::feature::Id::Number(id) => {
// let as_i64 = id.as_i64().unwrap();
// row.add(AcceptedTypes::Int(Some(as_i64 as i32)));
// }
// _ => (),
// }
for (_, value) in feature.properties.unwrap().into_iter() {
match value {
serde_json::Value::Number(num) => {
row.add(AcceptedTypes::Float(num.as_f64()));
}
serde_json::Value::String(string) => {
row.add(AcceptedTypes::Text(Some(string)));
}
serde_json::Value::Bool(boolean) => {
row.add(AcceptedTypes::Bool(Some(boolean)));
}
serde_json::Value::Null => {
row.add(AcceptedTypes::Text(None));
}
_ => println!("Type currently not supported ✘"),
}
}
let gj_geom = feature.geometry.unwrap();
let geom: geo::Geometry<f64> = gj_geom
.value
.try_into()
.expect("Failed to convert geojson::Geometry to geo::Geometry ✘");
let wkb = geom_to_wkb(&geom).expect("Could not convert geometry to WKB ✘");
// Check length of row
row.add(AcceptedTypes::Geometry(Some(Wkb { geometry: wkb })));
rows.add(row);
}
}
_ => println!("Not a feature collection ✘"),
}

Ok(rows)
Ok(geojson)
}

#[cfg(test)]
Expand Down
53 changes: 53 additions & 0 deletions src/file_types/geoparquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::{Error, Result};

use geoarrow::table::GeoTable;
use tokio::fs::File;

use geoarrow::io::parquet::read_geoparquet_async;
use geoarrow::io::parquet::GeoParquetReaderOptions;

async fn read_geoparquet(file: &str, batch_size: usize) -> Result<GeoTable> {
let file = File::open(file).await.unwrap();
let options = GeoParquetReaderOptions::new(batch_size, Default::default());
let geotable = read_geoparquet_async(file, options).await?;

Ok(geotable)
}

pub fn process_geotable() -> Result<()> {
let runtime = tokio::runtime::Runtime::new()?;
let geotable = runtime.block_on(read_geoparquet("../../data/saporo.parquet", 1000))?;
let geom_column = geotable.geometry()?;
let geom_type = geom_column.data_type();
println!("{:?}", geom_type);
let chunks = geom_column.geometry_chunks();

// To polygons
// for chunk in chunks {
// let polys = chunk.as_polygon();
// polys.iter().for_each(|poly| {
// if poly.is_some() {
// let poly = poly.unwrap();
// let geo_geom = poly.to_geo_geometry();
// println!("{:?}", geo_geom);
// }
// });
// }

Ok(())
}

// Write test for reading geoparquet
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_read_geoparquet() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let file_path = "examples/geoparquet/example.parquet";
let batch_size = 1000;
let result = runtime.block_on(read_geoparquet(file_path, batch_size)).unwrap();
assert_eq!(result.len(), 5);
}
}
1 change: 1 addition & 0 deletions src/file_types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod common;
mod geo;
pub mod shapefile;
pub mod geojson;
pub mod geoparquet;
Loading

0 comments on commit 164cdd2

Please sign in to comment.