From bd0c38104dba45fbc20be14304d92435565942dc Mon Sep 17 00:00:00 2001 From: Nikita Lapkov <5737185+laplab@users.noreply.github.com> Date: Fri, 9 Jun 2023 16:46:50 +0100 Subject: [PATCH] feat(inserter): start new insert only when the first row is provided (#68) feat(inserter): start new insert only when the first row is provided --- CHANGELOG.md | 4 ++++ src/inserter.rs | 42 ++++++++++++++++++++++++++++++++---------- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc84b72..6dd9c98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Changed +- inserter: start new insert only when the first row is provided ([#68]). + +[#68]: https://github.com/loyd/clickhouse.rs/pull/68 ## [0.11.4] - 2023-05-14 ### Added diff --git a/src/inserter.rs b/src/inserter.rs index 186f3ff..6833386 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -1,5 +1,9 @@ -use std::{future::Future, mem}; +use std::mem; +use futures::{ + future::{self, Either}, + Future, +}; use serde::Serialize; use tokio::time::{Duration, Instant}; @@ -17,7 +21,7 @@ pub struct Inserter { max_entries: u64, send_timeout: Option, end_timeout: Option, - insert: Insert, + insert: Option>, ticks: Ticks, committed: Quantities, uncommitted_entries: u64, @@ -51,7 +55,7 @@ where max_entries: DEFAULT_MAX_ENTRIES, send_timeout: None, end_timeout: None, - insert: client.insert(table)?, + insert: None, ticks: Ticks::default(), committed: Quantities::ZERO, uncommitted_entries: 0, @@ -116,7 +120,9 @@ where pub fn set_timeouts(&mut self, send_timeout: Option, end_timeout: Option) { self.send_timeout = send_timeout; self.end_timeout = end_timeout; - self.insert.set_timeouts(send_timeout, end_timeout); + if let Some(insert) = &mut self.insert { + insert.set_timeouts(self.send_timeout, self.end_timeout); + } } /// See [`Inserter::with_max_entries()`]. @@ -162,7 +168,12 @@ where T: Serialize, { self.uncommitted_entries += 1; - self.insert.write(row) + if self.insert.is_none() { + if let Err(e) = self.init_insert() { + return Either::Right(future::ready(Result::<()>::Err(e))); + } + } + Either::Left(self.insert.as_mut().unwrap().write(row)) } /// Checks limits and ends a current `INSERT` if they are reached. @@ -189,8 +200,10 @@ where /// Ends a current `INSERT` and whole `Inserter` unconditionally. /// /// If it isn't called, the current `INSERT` is aborted. - pub async fn end(self) -> Result { - self.insert.end().await?; + pub async fn end(mut self) -> Result { + if let Some(insert) = self.insert.take() { + insert.end().await?; + } Ok(self.committed) } @@ -200,9 +213,18 @@ where } async fn insert(&mut self) -> Result<()> { - let mut new_insert = self.client.insert(&self.table)?; // Actually it mustn't fail. + if let Some(insert) = self.insert.take() { + insert.end().await?; + } + Ok(()) + } + + #[cold] + #[inline(never)] + fn init_insert(&mut self) -> Result<()> { + debug_assert!(self.insert.is_none()); + let mut new_insert: Insert = self.client.insert(&self.table)?; new_insert.set_timeouts(self.send_timeout, self.end_timeout); - let insert = mem::replace(&mut self.insert, new_insert); - insert.end().await + Ok(()) } }