diff --git a/Cargo.toml b/Cargo.toml index ac4b9e3985..625f69f521 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ members = [ # "examples/postgres/axum-social-with-tests", # "examples/postgres/files", # "examples/postgres/json", -# "examples/postgres/listen", +# "examples/postgres/chat", # "examples/postgres/todos", # "examples/postgres/mockable-todos", # "examples/postgres/transaction", diff --git a/examples/postgres/chat/Cargo.toml b/examples/postgres/chat/Cargo.toml new file mode 100644 index 0000000000..0514e29eaf --- /dev/null +++ b/examples/postgres/chat/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "sqlx-example-postgres-chat" +version = "0.1.0" +edition = "2021" +workspace = "../../../" + +[dependencies] +sqlx = { path = "../../../", features = [ "postgres", "runtime-tokio-native-tls" ] } +futures = "0.3.1" +tokio = { version = "1.20.0", features = [ "rt-multi-thread", "macros" ] } +tui = "0.19.0" +crossterm = "0.25" +unicode-width = "0.1" diff --git a/examples/postgres/chat/README.md b/examples/postgres/chat/README.md new file mode 100644 index 0000000000..55dc0cd92c --- /dev/null +++ b/examples/postgres/chat/README.md @@ -0,0 +1,21 @@ +# Chat Example + +## Description + +This example demonstrates how to use PostgreSQL channels to create a very simple chat application. + +## Setup + +1. Declare the database URL + + ``` + export DATABASE_URL="postgres://postgres:password@localhost/files" + ``` + +## Usage + +Run the project + +``` +cargo run -p sqlx-examples-postgres-chat +``` diff --git a/examples/postgres/chat/src/main.rs b/examples/postgres/chat/src/main.rs new file mode 100644 index 0000000000..bd119f57ea --- /dev/null +++ b/examples/postgres/chat/src/main.rs @@ -0,0 +1,177 @@ +use crossterm::{ + event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, + execute, + terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, +}; +use sqlx::postgres::PgListener; +use sqlx::PgPool; +use std::sync::Arc; +use std::{error::Error, io}; +use tokio::{sync::Mutex, time::Duration}; +use tui::{ + backend::{Backend, CrosstermBackend}, + layout::{Constraint, Direction, Layout}, + style::{Color, Modifier, Style}, + text::{Span, Spans, Text}, + widgets::{Block, Borders, List, ListItem, Paragraph}, + Frame, Terminal, +}; +use unicode_width::UnicodeWidthStr; + +struct ChatApp { + input: String, + messages: Arc>>, + pool: PgPool, +} + +impl ChatApp { + fn new(pool: PgPool) -> Self { + ChatApp { + input: String::new(), + messages: Arc::new(Mutex::new(Vec::new())), + pool, + } + } + + async fn run( + mut self, + terminal: &mut Terminal, + mut listener: PgListener, + ) -> Result<(), Box> { + // setup listener task + let messages = self.messages.clone(); + std::mem::drop(tokio::spawn(async move { + while let Ok(msg) = listener.recv().await { + messages.lock().await.push(msg.payload().to_string()); + } + })); + + loop { + let messages: Vec = self + .messages + .lock() + .await + .iter() + .map(|m| { + let content = vec![Spans::from(Span::raw(m.to_owned()))]; + ListItem::new(content) + }) + .collect(); + + terminal.draw(|f| self.ui(f, messages))?; + + if !event::poll(Duration::from_millis(20))? { + continue; + } + + if let Event::Key(key) = event::read()? { + match key.code { + KeyCode::Enter => { + notify(&self.pool, self.input.drain(..).collect()).await?; + } + KeyCode::Char(c) => { + self.input.push(c); + } + KeyCode::Backspace => { + self.input.pop(); + } + KeyCode::Esc => { + return Ok(()); + } + _ => {} + } + } + } + } + + fn ui(&mut self, f: &mut Frame, messages: Vec) { + let chunks = Layout::default() + .direction(Direction::Vertical) + .margin(2) + .constraints( + [ + Constraint::Length(1), + Constraint::Length(3), + Constraint::Min(1), + ] + .as_ref(), + ) + .split(f.size()); + + let text = Text::from(Spans::from(vec![ + Span::raw("Press "), + Span::styled("Enter", Style::default().add_modifier(Modifier::BOLD)), + Span::raw(" to send the message, "), + Span::styled("Esc", Style::default().add_modifier(Modifier::BOLD)), + Span::raw(" to quit"), + ])); + let help_message = Paragraph::new(text); + f.render_widget(help_message, chunks[0]); + + let input = Paragraph::new(self.input.as_ref()) + .style(Style::default().fg(Color::Yellow)) + .block(Block::default().borders(Borders::ALL).title("Input")); + f.render_widget(input, chunks[1]); + f.set_cursor( + // Put cursor past the end of the input text + chunks[1].x + self.input.width() as u16 + 1, + // Move one line down, from the border to the input line + chunks[1].y + 1, + ); + + let messages = + List::new(messages).block(Block::default().borders(Borders::ALL).title("Messages")); + f.render_widget(messages, chunks[2]); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // setup postgres + let conn_str = + std::env::var("DATABASE_URL").expect("Env var DATABASE_URL is required for this example."); + let pool = sqlx::PgPool::connect(&conn_str).await?; + + let mut listener = PgListener::connect(&conn_str).await?; + listener.listen_all(vec!["chan0"]).await?; + + // setup terminal + enable_raw_mode()?; + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + + // create app and run it + let app = ChatApp::new(pool); + let res = app.run(&mut terminal, listener).await; + + // restore terminal + disable_raw_mode()?; + execute!( + terminal.backend_mut(), + LeaveAlternateScreen, + DisableMouseCapture, + )?; + terminal.show_cursor()?; + + if let Err(err) = res { + println!("{:?}", err) + } + + Ok(()) +} + +async fn notify(pool: &PgPool, s: String) -> Result<(), sqlx::Error> { + sqlx::query( + r#" +SELECT pg_notify(chan, payload) +FROM (VALUES ('chan0', $1)) v(chan, payload) +"#, + ) + .bind(s) + .execute(pool) + .await?; + + Ok(()) +} diff --git a/examples/postgres/listen/Cargo.toml b/examples/postgres/listen/Cargo.toml deleted file mode 100644 index 7fa814c27e..0000000000 --- a/examples/postgres/listen/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "sqlx-example-postgres-listen" -version = "0.1.0" -edition = "2021" -workspace = "../../../" - -[dependencies] -sqlx = { path = "../../../", features = [ "postgres" ] } -futures = "0.3.1" -tokio = { version = "1.20.0", features = ["macros"]} diff --git a/examples/postgres/listen/README.md b/examples/postgres/listen/README.md deleted file mode 100644 index 7a0c39a76b..0000000000 --- a/examples/postgres/listen/README.md +++ /dev/null @@ -1,18 +0,0 @@ -Postgres LISTEN/NOTIFY -====================== - -## Usage - -Declare the database URL. This example does not include any reading or writing of data. - -``` -export DATABASE_URL="postgres://postgres@localhost/postgres" -``` - -Run. - -``` -cargo run -``` - -The example program should connect to the database, and create a LISTEN loop on a predefined set of channels. A NOTIFY task will be spawned which will connect to the same database and will emit notifications on a 5 second interval. diff --git a/examples/postgres/listen/src/main.rs b/examples/postgres/listen/src/main.rs deleted file mode 100644 index e8228d44ae..0000000000 --- a/examples/postgres/listen/src/main.rs +++ /dev/null @@ -1,82 +0,0 @@ -use futures::StreamExt; -use futures::TryStreamExt; -use sqlx::postgres::PgListener; -use sqlx::{Executor, PgPool}; -use std::sync::atomic::{AtomicI64, Ordering}; -use std::time::Duration; - -#[tokio::main] -async fn main() -> Result<(), Box> { - println!("Building PG pool."); - let conn_str = - std::env::var("DATABASE_URL").expect("Env var DATABASE_URL is required for this example."); - let pool = sqlx::PgPool::connect(&conn_str).await?; - - let mut listener = PgListener::connect(&conn_str).await?; - - // let notify_pool = pool.clone(); - let _t = async_std::task::spawn(async move { - stream::interval(Duration::from_secs(2)) - .for_each(|_| notify(&pool)) - .await - }); - - println!("Starting LISTEN loop."); - - listener.listen_all(vec!["chan0", "chan1", "chan2"]).await?; - - let mut counter = 0usize; - loop { - let notification = listener.recv().await?; - println!("[from recv]: {:?}", notification); - - counter += 1; - if counter >= 3 { - break; - } - } - - // Prove that we are buffering messages by waiting for 6 seconds - listener.execute("SELECT pg_sleep(6)").await?; - - let mut stream = listener.into_stream(); - while let Some(notification) = stream.try_next().await? { - println!("[from stream]: {:?}", notification); - } - - Ok(()) -} - -async fn notify(pool: &PgPool) { - static COUNTER: AtomicI64 = AtomicI64::new(0); - - // There's two ways you can invoke `NOTIFY`: - // - // 1: `NOTIFY , ''` which cannot take bind parameters and - // is an identifier which is lowercased unless double-quoted - // - // 2: `SELECT pg_notify('', '')` which can take bind parameters - // and preserves its case - // - // We recommend #2 for consistency and usability. - - // language=PostgreSQL - let res = sqlx::query( - r#" --- this emits '{ "payload": N }' as the actual payload -select pg_notify(chan, json_build_object('payload', payload)::text) -from ( - values ('chan0', $1), - ('chan1', $2), - ('chan2', $3) - ) notifies(chan, payload) - "#, - ) - .bind(&COUNTER.fetch_add(1, Ordering::SeqCst)) - .bind(&COUNTER.fetch_add(1, Ordering::SeqCst)) - .bind(&COUNTER.fetch_add(1, Ordering::SeqCst)) - .execute(pool) - .await; - - println!("[from notify]: {:?}", res); -}