Skip to content

Commit

Permalink
fix errors parsing for native jobs + add timestamp support for pg
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Jul 7, 2023
1 parent c3503dc commit 783588f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 13 deletions.
1 change: 1 addition & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,6 @@ serde-wasm-bindgen = "0.4"
wasm-bindgen-test = "0.3.0"
convert_case = "0.6.0"
getrandom = "0.2"
tokio-postgres = {version = "^0.7", features = ["array-impls", "with-serde_json-1"]}
tokio-postgres = {version = "^0.7", features = ["array-impls", "with-serde_json-1", "with-chrono-0_4"]}
postgres-native-tls = "^0"
native-tls = "^0"
26 changes: 22 additions & 4 deletions backend/windmill-worker/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{Context, Error};
use chrono::{NaiveDateTime, Utc};
use serde_json::Map;
use sqlx::{Pool, Postgres};
use tokio::{fs::File, io::AsyncReadExt};
Expand Down Expand Up @@ -93,6 +94,15 @@ pub fn pg_cell_to_json_value(
Type::TEXT | Type::VARCHAR => {
get_basic(row, column, column_i, |a: String| Ok(JSONValue::String(a)))?
}
Type::TIMESTAMP => get_basic(row, column, column_i, |a: chrono::NaiveDateTime| {
Ok(JSONValue::String(a.to_string()))
})?,
Type::TIMESTAMPTZ => get_basic(row, column, column_i, |a: chrono::DateTime<Utc>| {
Ok(JSONValue::String(a.to_string()))
})?,
// Type::DATE => get_basic(row, column, column_i, |a: chrono::NaiveDate| {
// Ok(JSONValue::String(a.to_string()))
// })?,
Type::JSON | Type::JSONB => get_basic(row, column, column_i, |a: JSONValue| Ok(a))?,
Type::FLOAT4 => get_basic(row, column, column_i, |a: f32| {
Ok(f64_to_json_number(a.into())?)
Expand Down Expand Up @@ -141,9 +151,12 @@ fn get_basic<'a, T: FromSql<'a>>(
column_i: usize,
val_to_json_val: impl Fn(T) -> Result<JSONValue, Error>,
) -> Result<JSONValue, Error> {
let raw_val = row
.try_get::<_, Option<T>>(column_i)
.with_context(|| format!("column_name:{}", column.name()))?;
let raw_val = row.try_get::<_, Option<T>>(column_i).with_context(|| {
format!(
"conversion issue for value at column_name:{}",
column.name()
)
})?;
raw_val.map_or(Ok(JSONValue::Null), val_to_json_val)
}
fn get_array<'a, T: FromSql<'a>>(
Expand All @@ -154,7 +167,12 @@ fn get_array<'a, T: FromSql<'a>>(
) -> Result<JSONValue, Error> {
let raw_val_array = row
.try_get::<_, Option<Vec<T>>>(column_i)
.with_context(|| format!("column_name:{}", column.name()))?;
.with_context(|| {
format!(
"conversion issue for array at column_name:{}",
column.name()
)
})?;
Ok(match raw_val_array {
Some(val_array) => {
let mut result = vec![];
Expand Down
15 changes: 12 additions & 3 deletions backend/windmill-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,10 @@ async fn handle_queued_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>(
Ok(jc) => job_completed_tx.send(jc).await.expect("send job completed"),
Err(e) => job_completed_tx.send(JobCompleted {
job: job,
result: extract_error_value(&e.to_string(), 1),
result: json!({"error": {
"name": "ExecutionError",
"message": e.to_string()
}}),
logs: "".to_string(),
success: false
}).await.expect("send job completed"),
Expand All @@ -1068,7 +1071,10 @@ async fn handle_queued_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>(
Ok(jc) => job_completed_tx.send(jc).await.expect("send job completed"),
Err(e) => job_completed_tx.send(JobCompleted {
job: job,
result: extract_error_value(&e.to_string(), 1),
result: json!({"error": {
"name": "ExecutionError",
"message": e.to_string()
}}),
logs: "".to_string(),
success: false
}).await.expect("send job completed"),
Expand All @@ -1090,7 +1096,10 @@ async fn handle_queued_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>(
Ok(jc) => job_completed_tx.send(jc).await.expect("send job completed"),
Err(e) => job_completed_tx.send(JobCompleted {
job: job,
result: extract_error_value(&e.to_string(), 1),
result: json!({"error": {
"name": "ExecutionError",
"message": e.to_string()
}}),
logs: "".to_string(),
success: false
}).await.expect("send job completed"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import { determineD3Instance } from '../..//d3/controllers/d3'
import { findStore } from '../../store/controllers/storeApi'
import { onDestroy } from 'svelte'
import { Expand, Minus, Plus } from 'lucide-svelte'
import Toggle from '$lib/components/Toggle.svelte'
Expand Down Expand Up @@ -104,10 +103,6 @@
})
})
onDestroy(() => {
d3.select('svg').remove()
})
function handleZoom(e) {
if (!$movementStore) return
//add a store that contains the current value of the d3-zoom's scale to be used in onMouseMove function
Expand Down

0 comments on commit 783588f

Please sign in to comment.