diff --git a/src/main.rs b/src/main.rs index 4c8b6c0..38265b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,7 +25,7 @@ use std::fs::File; use std::io; mod database; -use database::{DatabaseConnection, DatabaseConnector, PostgresConnectionExt}; +use database::{DatabaseConnection, DatabaseConnector, DbRow}; @@ -38,9 +38,11 @@ struct RoomRow { sender: String, state_group: Option, json: serde_json::Value, + internal_metadata: serde_json::Value, ts: i64, edges: Vec, stream_ordering: i32, + rejection_reason: Option, } #[derive(Serialize)] @@ -94,11 +96,12 @@ impl RouteHandler for RoomHandler { let mut conn = self.connector.connect(); let events = match conn { - DatabaseConnection::Postgres(ref mut pg_conn) => { - pg_conn.query_rows( + DatabaseConnection::Postgres(_) => { + conn.query( r#" - SELECT event_id, events.type, state_key, depth, sender, state_group, - json, origin_server_ts, stream_ordering, + SELECT event_id, events.type, state_events.state_key, depth, sender, state_group, + json, internal_metadata, origin_server_ts, stream_ordering, + rejections.reason, array( SELECT prev_event_id FROM event_edges WHERE is_state = false and event_id = events.event_id @@ -107,53 +110,32 @@ impl RouteHandler for RoomHandler { INNER JOIN event_json USING (event_id) LEFT JOIN state_events USING (event_id) LEFT JOIN event_to_state_groups USING (event_id) + LEFT JOIN rejections USING (event_id) WHERE events.room_id = $1 AND stream_ordering <= $2::bigint ORDER BY stream_ordering DESC LIMIT $3::int "#, &[&room_id, &max_stream, &page_size], - |row| RoomRow { - event_id: row.get(0), - etype: row.get(1), - state_key: row.get(2), - depth: row.get(3), - sender: row.get(4), - state_group: row.get(5), - json: serde_json::from_str(&row.get::<_, String>(6)) - .expect("json was not json"), - ts: row.get(7), - stream_ordering: row.get(8), - edges: row.get(9), - } + |row| parse_event_row(row), ).expect("room sql query failed") } DatabaseConnection::Sqlite(_) => { let mut events = conn.query( r#" - SELECT event_id, events.type, state_key, depth, sender, state_group, json, origin_server_ts, - stream_ordering + SELECT event_id, events.type, state_events.state_key, depth, sender, state_group, + json, internal_metadata, origin_server_ts, stream_ordering, + rejections.reason FROM events JOIN event_json USING (event_id) LEFT JOIN state_events USING (event_id) LEFT JOIN event_to_state_groups USING (event_id) + LEFT JOIN rejections USING (event_id) WHERE events.room_id = $1 AND stream_ordering <= $2::bigint ORDER BY stream_ordering DESC LIMIT $3::int "#, &[&room_id, &max_stream, &page_size], - |row| RoomRow { - event_id: row.get(0), - etype: row.get(1), - state_key: row.get(2), - depth: row.get(3), - sender: row.get(4), - state_group: row.get(5), - json: serde_json::from_str(&row.get::(6)) - .expect("json was not json"), - ts: row.get(7), - stream_ordering: row.get(8), - edges: Vec::new(), - } + |row| parse_event_row(row), ).expect("room sql query failed"); for event in &mut events { @@ -195,7 +177,7 @@ impl RouteHandler for StateHandler { SELECT prev_state_group FROM state_group_edges e, state s WHERE s.state_group = e.state_group ) - SELECT event_id, es.type, state_key, ej.json, e.depth + SELECT event_id, es.type, es.state_key, ej.json, e.depth FROM state_groups_state NATURAL JOIN ( SELECT type, state_key, max(state_group) as state_group FROM state_groups_state @@ -221,6 +203,32 @@ impl RouteHandler for StateHandler { } } +fn parse_event_row(row: &mut DbRow<'_>) -> RoomRow { + // the postgres variant returns the event edges as an array + let edges = match row { + DbRow::Postgres(ref mut pgrow) => pgrow.get(11), + _ => Vec::new(), + }; + + return RoomRow { + event_id: row.get(0), + etype: row.get(1), + state_key: row.get(2), + depth: row.get(3), + sender: row.get(4), + state_group: row.get(5), + json: serde_json::from_str(&row.get::(6)) + .expect("json was not json"), + internal_metadata: serde_json::from_str(&row.get::(7)) + .expect("internal_metadata was not json"), + ts: row.get(8), + stream_ordering: row.get(9), + rejection_reason: row.get(10), + edges: edges, + }; +} + + fn content_from_json(s: String) -> serde_json::Value { let json: serde_json::Value = serde_json::from_str(&s).expect("content was not json"); return json["content"].clone()