Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl Arrow Flight Protocol for Querying #769

Merged
merged 18 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ build = "build.rs"
arrow-schema = { version = "51.0.0", features = ["serde"] }
arrow-array = { version = "51.0.0" }
arrow-json = "51.0.0"
arrow-ipc = "51.0.0"
arrow-ipc = { version = "51.0.0", features = ["zstd"] }
arrow-select = "51.0.0"
datafusion = "37.1.0"
object_store = { version = "0.9.1", features = ["cloud", "aws"] }
parquet = "51.0.0"

### LiveTail server deps
arrow-flight = "51.0.0"
tonic = {version = "0.11.0", features = ["tls"] }
arrow-flight = { version = "51.0.0", features = [ "tls" ] }
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.11.0"
tower-http = { version = "0.4.4", features = ["cors"] }

Expand Down
5 changes: 3 additions & 2 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ fn print_ascii_art() {

fn status_info(config: &Config, scheme: &str, id: Uid) {
let address = format!(
"\"{}://{}\" ({}), \":{}\" (gRPC)",
"\"{}://{}\" ({}), \":{}\" (livetail), \":{}\" (flight protocol)",
scheme,
config.parseable.address,
scheme.to_ascii_uppercase(),
config.parseable.grpc_port
config.parseable.grpc_port,
config.parseable.flight_port
);

let mut credentials =
Expand Down
28 changes: 23 additions & 5 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ pub struct Cli {

/// public address for the parseable server ingestor
pub ingestor_endpoint: String,

/// port use by airplane(flight query service)
pub flight_port: u16,
}

impl Cli {
Expand Down Expand Up @@ -118,6 +121,7 @@ impl Cli {
pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint";
pub const DEFAULT_USERNAME: &'static str = "admin";
pub const DEFAULT_PASSWORD: &'static str = "admin";
pub const FLIGHT_PORT: &'static str = "flight-port";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
Expand Down Expand Up @@ -275,6 +279,16 @@ impl Cli {
.value_parser(value_parser!(u16))
.help("Port for gRPC server"),
)
.arg(
Arg::new(Self::FLIGHT_PORT)
.long(Self::FLIGHT_PORT)
.env("P_FLIGHT_PORT")
.value_name("PORT")
.default_value("8002")
.required(false)
.value_parser(value_parser!(u16))
.help("Port for Arrow Flight Querying Engine"),
)
.arg(
Arg::new(Self::LIVETAIL_CAPACITY)
.long(Self::LIVETAIL_CAPACITY)
Expand Down Expand Up @@ -317,11 +331,11 @@ impl Cli {
.help("Mode of operation"),
)
.arg(
Arg::new(Self::INGESTOR_ENDPOINT)
.long(Self::INGESTOR_ENDPOINT)
.env("P_INGESTOR_ENDPOINT")
.value_name("URL")
.required(false)
Arg::new(Self::INGESTOR_ENDPOINT)
.long(Self::INGESTOR_ENDPOINT)
.env("P_INGESTOR_ENDPOINT")
.value_name("URL")
.required(false)
.help("URL to connect to this specific ingestor. Default is the address of the server.")
)
.arg(
Expand Down Expand Up @@ -401,6 +415,10 @@ impl FromArgMatches for Cli {
.get_one::<u16>(Self::GRPC_PORT)
.cloned()
.expect("default for livetail port");
self.flight_port = m
.get_one::<u16>(Self::FLIGHT_PORT)
.cloned()
.expect("default for flight port");
self.livetail_channel_capacity = m
.get_one::<usize>(Self::LIVETAIL_CAPACITY)
.cloned()
Expand Down
24 changes: 21 additions & 3 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::metadata;
use crate::{handlers::http::ingest::PostError, metadata};
use chrono::NaiveDateTime;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
Expand All @@ -48,7 +48,7 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
pub async fn process(self) -> Result<(), EventError> {
pub async fn process(&self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
Expand Down Expand Up @@ -77,7 +77,7 @@ impl Event {
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);

if let Err(e) = metadata::STREAM_INFO
.check_alerts(&self.stream_name, self.rb)
.check_alerts(&self.stream_name, &self.rb)
.await
{
log::error!("Error checking for alerts. {:?}", e);
Expand All @@ -86,6 +86,24 @@ impl Event {
Ok(())
}

pub fn process_unchecked(self) -> Result<Self, PostError> {
let key = get_schema_key(&self.rb.schema().fields);

Self::process_event(
&self.stream_name,
&key,
self.rb.clone(),
self.parsed_timestamp,
)
.map_err(PostError::Event)?;

Ok(self)
}

pub fn clear(&self, stream_name: &str) {
STREAM_WRITERS.clear(stream_name);
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event(
Expand Down
80 changes: 74 additions & 6 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ mod mem_writer;

use std::{
collections::HashMap,
sync::{Arc, Mutex, RwLock},
sync::{Arc, Mutex, RwLock, RwLockWriteGuard},
};

use crate::{
option::{Mode, CONFIG},
utils,
};

use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
use crate::utils;
use arrow_array::{RecordBatch, TimestampMillisecondArray};
use arrow_schema::Schema;
use chrono::NaiveDateTime;
Expand Down Expand Up @@ -62,6 +66,11 @@ impl Writer {
self.mem.push(schema_key, rb);
Ok(())
}

fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> {
self.mem.push(schema_key, rb);
Ok(())
}
}

#[derive(Deref, DerefMut, Default)]
Expand All @@ -80,7 +89,8 @@ impl WriterTable {

match hashmap_guard.get(stream_name) {
Some(stream_writer) => {
stream_writer.lock().unwrap().push(
self.handle_existing_writer(
stream_writer,
stream_name,
schema_key,
record,
Expand All @@ -89,26 +99,84 @@ impl WriterTable {
}
None => {
drop(hashmap_guard);
let mut map = self.write().unwrap();
let map = self.write().unwrap();
// check for race condition
// if map contains entry then just
if let Some(writer) = map.get(stream_name) {
self.handle_missing_writer(map, stream_name, schema_key, record, parsed_timestamp)?;
}
};
Ok(())
}

fn handle_existing_writer(
&self,
stream_writer: &Mutex<Writer>,
stream_name: &str,
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
if CONFIG.parseable.mode != Mode::Query {
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
} else {
stream_writer
.lock()
.unwrap()
.push_mem(stream_name, record)?;
}

Ok(())
}

fn handle_missing_writer(
&self,
mut map: RwLockWriteGuard<HashMap<String, Mutex<Writer>>>,
stream_name: &str,
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
match map.get(stream_name) {
Some(writer) => {
if CONFIG.parseable.mode != Mode::Query {
writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
} else {
writer.lock().unwrap().push_mem(stream_name, record)?;
}
}
None => {
if CONFIG.parseable.mode != Mode::Query {
let mut writer = Writer::default();
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
} else {
let mut writer = Writer::default();
writer.push_mem(schema_key, record)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
};
}
Ok(())
}

pub fn clear(&self, stream_name: &str) {
let map = self.write().unwrap();
if let Some(writer) = map.get(stream_name) {
let w = &mut writer.lock().unwrap().mem;
w.clear();
}
}

pub fn delete_stream(&self, stream_name: &str) {
self.write().unwrap().remove(stream_name);
}
Expand Down
1 change: 1 addition & 0 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::storage::staging::StorageDir;
use chrono::NaiveDateTime;

pub struct ArrowWriter {
#[allow(dead_code)]
pub file_path: PathBuf,
pub writer: StreamWriter<File>,
}
Expand Down
Loading
Loading