From 049bf2308ea563c7fba2cf2b02eb41884d73cecf Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Tue, 24 Oct 2023 20:10:04 -0600 Subject: [PATCH] chore(dev): Add wrapper for `file-source` in `vector-lib` --- Cargo.lock | 2 +- Cargo.toml | 5 ++--- lib/vector-lib/Cargo.toml | 2 ++ lib/vector-lib/src/lib.rs | 2 ++ src/internal_events/file.rs | 2 +- src/sources/file.rs | 12 ++++++------ src/sources/kubernetes_logs/k8s_paths_provider.rs | 2 +- src/sources/kubernetes_logs/mod.rs | 8 ++++---- src/sources/kubernetes_logs/util.rs | 8 ++++---- 9 files changed, 23 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 06714ca4c8f0b..82ddc114facfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9749,7 +9749,6 @@ dependencies = [ "enum_dispatch", "exitcode", "fakedata", - "file-source", "flate2", "futures 0.3.29", "futures-util", @@ -10120,6 +10119,7 @@ version = "0.1.0" dependencies = [ "codecs", "enrichment", + "file-source", "vector-buffers", "vector-common", "vector-config", diff --git a/Cargo.toml b/Cargo.toml index 03c7bb28096c3..3471ec8f904c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,7 +131,6 @@ pin-project.workspace = true # Internal libs dnsmsg-parser = { path = "lib/dnsmsg-parser", optional = true } fakedata = { path = "lib/fakedata", optional = true } -file-source = { path = "lib/file-source", optional = true } lookup = { package = "vector-lookup", path = "lib/vector-lookup" } portpicker = { path = "lib/portpicker" } prometheus-parser = { path = "lib/prometheus-parser", optional = true } @@ -525,7 +524,7 @@ sources-dnstap = ["dep:base64", "dep:trust-dns-proto", "dep:dnsmsg-parser", "pro sources-docker_logs = ["docker"] sources-eventstoredb_metrics = [] sources-exec = [] -sources-file = ["dep:file-source"] +sources-file = ["vector-lib/file-source"] sources-file-descriptor = ["tokio-util/io"] sources-fluent = ["dep:base64", "sources-utils-net-tcp", "tokio-util/net", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"] sources-gcp_pubsub = ["gcp", "dep:h2", "dep:prost-types", "protobuf-build", "dep:tonic"] @@ -537,7 +536,7 @@ sources-internal_logs = [] sources-internal_metrics = [] sources-journald = [] sources-kafka = ["dep:rdkafka"] -sources-kubernetes_logs = ["dep:file-source", "kubernetes", "transforms-reduce"] +sources-kubernetes_logs = ["vector-lib/file-source", "kubernetes", "transforms-reduce"] sources-logstash = ["sources-utils-net-tcp", "tokio-util/net"] sources-mongodb_metrics = ["dep:mongodb"] sources-nats = ["dep:async-nats", "dep:nkeys"] diff --git a/lib/vector-lib/Cargo.toml b/lib/vector-lib/Cargo.toml index 65448d4eb0920..865e17f2a0c97 100644 --- a/lib/vector-lib/Cargo.toml +++ b/lib/vector-lib/Cargo.toml @@ -8,6 +8,7 @@ publish = false [dependencies] codecs = { path = "../codecs", default-features = false } enrichment = { path = "../enrichment" } +file-source = { path = "../file-source", optional = true } vector-buffers = { path = "../vector-buffers", default-features = false } vector-common = { path = "../vector-common" } vector-config = { path = "../vector-config" } @@ -17,6 +18,7 @@ vector-stream = { path = "../vector-stream" } [features] api = ["vector-core/api"] lua = ["vector-core/lua"] +file-source = ["dep:file-source"] syslog = ["codecs/syslog"] test = ["vector-core/test"] vrl = ["vector-core/vrl"] diff --git a/lib/vector-lib/src/lib.rs b/lib/vector-lib/src/lib.rs index 83bd6d7a35086..100e618616711 100644 --- a/lib/vector-lib/src/lib.rs +++ b/lib/vector-lib/src/lib.rs @@ -1,5 +1,7 @@ pub use codecs; pub use enrichment; +#[cfg(feature = "file-source")] +pub use file_source; pub use vector_buffers as buffers; pub use vector_common::{ assert_event_data_eq, btreemap, byte_size_of, byte_size_of::ByteSizeOf, conversion, diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index 871be8a93ce7b..44732a1bb3cf4 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -82,8 +82,8 @@ impl<'a, P: std::fmt::Debug> InternalEvent for FileIoError<'a, P> { mod source { use std::{io::Error, path::Path, time::Duration}; - use file_source::FileSourceInternalEvents; use metrics::counter; + use vector_lib::file_source::FileSourceInternalEvents; use super::{FileOpen, InternalEvent}; use crate::emit; diff --git a/src/sources/file.rs b/src/sources/file.rs index 361ebee0d3d27..baeb6a3d956a7 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -2,12 +2,6 @@ use std::{convert::TryInto, future, path::PathBuf, time::Duration}; use bytes::Bytes; use chrono::Utc; -use file_source::{ - calculate_ignore_before, - paths_provider::glob::{Glob, MatchOptions}, - Checkpointer, FileFingerprint, FileServer, FingerprintStrategy, Fingerprinter, Line, ReadFrom, - ReadFromConfig, -}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath}; use regex::bytes::Regex; @@ -17,6 +11,12 @@ use tokio::{sync::oneshot, task::spawn_blocking}; use tracing::{Instrument, Span}; use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig}; use vector_lib::configurable::configurable_component; +use vector_lib::file_source::{ + calculate_ignore_before, + paths_provider::glob::{Glob, MatchOptions}, + Checkpointer, FileFingerprint, FileServer, FingerprintStrategy, Fingerprinter, Line, ReadFrom, + ReadFromConfig, +}; use vector_lib::finalizer::OrderedFinalizer; use vector_lib::{ config::{LegacyKey, LogNamespace}, diff --git a/src/sources/kubernetes_logs/k8s_paths_provider.rs b/src/sources/kubernetes_logs/k8s_paths_provider.rs index 83ccd2955b5ae..0eacbcd74c76e 100644 --- a/src/sources/kubernetes_logs/k8s_paths_provider.rs +++ b/src/sources/kubernetes_logs/k8s_paths_provider.rs @@ -4,9 +4,9 @@ use std::path::PathBuf; -use file_source::paths_provider::PathsProvider; use k8s_openapi::api::core::v1::{Namespace, Pod}; use kube::runtime::reflector::{store::Store, ObjectRef}; +use vector_lib::file_source::paths_provider::PathsProvider; use super::path_helpers::build_pod_logs_directory; use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum; diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index af68b8cca5e4c..c2b245d3436e2 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -8,10 +8,6 @@ use std::{path::PathBuf, time::Duration}; use bytes::Bytes; use chrono::Utc; -use file_source::{ - calculate_ignore_before, Checkpointer, FileServer, FileServerShutdown, FingerprintStrategy, - Fingerprinter, Line, ReadFrom, ReadFromConfig, -}; use futures::{future::FutureExt, stream::StreamExt}; use futures_util::Stream; use k8s_openapi::api::core::v1::{Namespace, Node, Pod}; @@ -27,6 +23,10 @@ use lookup::{lookup_v2::OptionalTargetPath, owned_value_path, path, OwnedTargetP use serde_with::serde_as; use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig}; use vector_lib::configurable::configurable_component; +use vector_lib::file_source::{ + calculate_ignore_before, Checkpointer, FileServer, FileServerShutdown, FingerprintStrategy, + Fingerprinter, Line, ReadFrom, ReadFromConfig, +}; use vector_lib::{config::LegacyKey, config::LogNamespace, EstimatedJsonEncodedSizeOf}; use vector_lib::{ internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol}, diff --git a/src/sources/kubernetes_logs/util.rs b/src/sources/kubernetes_logs/util.rs index c22d1e54c3408..16691b8541e3d 100644 --- a/src/sources/kubernetes_logs/util.rs +++ b/src/sources/kubernetes_logs/util.rs @@ -1,14 +1,14 @@ use std::{error::Error, future::Future, time::Duration}; -use file_source::{ - paths_provider::PathsProvider, Checkpointer, FileServer, FileServerShutdown, - FileSourceInternalEvents, Line, -}; use futures::{ future::{select, Either}, pin_mut, FutureExt, Sink, }; use tokio::task::spawn_blocking; +use vector_lib::file_source::{ + paths_provider::PathsProvider, Checkpointer, FileServer, FileServerShutdown, + FileSourceInternalEvents, Line, +}; /// A tiny wrapper around a [`FileServer`] that runs it as a [`spawn_blocking`] /// task.