Skip to content

Commit

Permalink
chore(deps): Swap out bloom crate for bloomy (#17911)
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Szwedko <jesse.szwedko@datadoghq.com>

---------

Signed-off-by: Jesse Szwedko <jesse.szwedko@datadoghq.com>
  • Loading branch information
jszwedko authored Jul 7, 2023
1 parent 45e24c7 commit d592b0c
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 45 deletions.
22 changes: 8 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async-compression = { version = "0.4.0", default-features = false, features = ["
apache-avro = { version = "0.14.0", default-features = false, optional = true }
axum = { version = "0.6.18", default-features = false }
base64 = { version = "0.21.2", default-features = false, optional = true }
bloom = { version = "0.3.2", default-features = false, optional = true }
bloomy = { version = "1.2.0", default-features = false, optional = true }
bollard = { version = "0.14.0", default-features = false, features = ["ssl", "chrono"], optional = true }
bytes = { version = "1.4.0", default-features = false, features = ["serde"] }
bytesize = { version = "1.2.0", default-features = false }
Expand Down Expand Up @@ -593,7 +593,7 @@ transforms-reduce = []
transforms-remap = []
transforms-route = []
transforms-sample = []
transforms-tag_cardinality_limit = ["dep:bloom", "dep:hashbrown"]
transforms-tag_cardinality_limit = ["dep:bloomy", "dep:hashbrown"]
transforms-throttle = ["dep:governor"]

# Sinks
Expand Down
2 changes: 1 addition & 1 deletion LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ bitvec,https://github.com/bitvecto-rs/bitvec,MIT,The bitvec Authors
block-buffer,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers
block-padding,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers
blocking,https://github.com/smol-rs/blocking,Apache-2.0 OR MIT,Stjepan Glavina <stjepang@gmail.com>
bloom,https://github.com/nicklan/bloom-rs,GPL-2.0,Nick Lanham <nick@afternight.org>
bloomy,https://docs.rs/bloomy/,MIT,"Aleksandr Bezobchuk <aleks.bezobchuk@gmail.com>, Alexis Sellier <self@cloudhead.io>"
bollard,https://github.com/fussybeaver/bollard,Apache-2.0,Bollard contributors
borsh,https://github.com/near/borsh-rs,MIT OR Apache-2.0,Near Inc <hello@near.org>
borsh-derive,https://github.com/nearprotocol/borsh,Apache-2.0,Near Inc <hello@nearprotocol.com>
Expand Down
4 changes: 2 additions & 2 deletions src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use vector_core::config::LogNamespace;
pub struct TagCardinalityLimitConfig {
/// How many distinct values to accept for any given key.
#[serde(default = "default_value_limit")]
pub value_limit: u32,
pub value_limit: usize,

#[configurable(derived)]
#[serde(default = "default_limit_exceeded_action")]
Expand Down Expand Up @@ -81,7 +81,7 @@ const fn default_limit_exceeded_action() -> LimitExceededAction {
LimitExceededAction::DropTag
}

const fn default_value_limit() -> u32 {
const fn default_value_limit() -> usize {
500
}

Expand Down
6 changes: 3 additions & 3 deletions src/transforms/tag_cardinality_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl TagCardinalityLimit {
}

// Tag value not yet part of the accepted set.
if tag_value_set.len() < self.config.value_limit as usize {
if tag_value_set.len() < self.config.value_limit {
// accept the new value
tag_value_set.insert(value.clone());

if tag_value_set.len() == self.config.value_limit as usize {
if tag_value_set.len() == self.config.value_limit {
emit!(TagCardinalityValueLimitReached { key });
}

Expand All @@ -76,7 +76,7 @@ impl TagCardinalityLimit {
self.accepted_tags
.get(key)
.map(|value_set| {
!value_set.contains(value) && value_set.len() >= self.config.value_limit as usize
!value_set.contains(value) && value_set.len() >= self.config.value_limit
})
.unwrap_or(false)
}
Expand Down
38 changes: 17 additions & 21 deletions src/transforms/tag_cardinality_limit/tag_value_set.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use crate::event::metric::TagValueSet;
use crate::transforms::tag_cardinality_limit::config::Mode;
use bloom::{BloomFilter, ASMS};
use bloomy::BloomFilter;
use std::collections::HashSet;
use std::fmt;

/// Container for storing the set of accepted values for a given tag key.
#[derive(Debug)]
pub struct AcceptedTagValueSet {
storage: TagValueSetStorage,
num_elements: usize,
}

enum TagValueSetStorage {
Set(HashSet<TagValueSet>),
Bloom(BloomFilter),
Bloom(BloomFilter<TagValueSet>),
}

impl fmt::Debug for TagValueSetStorage {
Expand All @@ -26,40 +25,37 @@ impl fmt::Debug for TagValueSetStorage {
}

impl AcceptedTagValueSet {
pub fn new(value_limit: u32, mode: &Mode) -> Self {
pub fn new(value_limit: usize, mode: &Mode) -> Self {
let storage = match &mode {
Mode::Exact => TagValueSetStorage::Set(HashSet::with_capacity(value_limit as usize)),
Mode::Exact => TagValueSetStorage::Set(HashSet::with_capacity(value_limit)),
Mode::Probabilistic(config) => {
let num_bits = config.cache_size_per_key / 8; // Convert bytes to bits
let num_hashes = bloom::optimal_num_hashes(num_bits, value_limit);
TagValueSetStorage::Bloom(BloomFilter::with_size(num_bits, num_hashes))
TagValueSetStorage::Bloom(BloomFilter::with_size(num_bits))
}
};
Self {
storage,
num_elements: 0,
}
Self { storage }
}

pub fn contains(&self, value: &TagValueSet) -> bool {
match &self.storage {
TagValueSetStorage::Set(set) => set.contains(value),
TagValueSetStorage::Bloom(bloom) => bloom.contains(&value),
TagValueSetStorage::Bloom(bloom) => bloom.contains(value),
}
}

pub const fn len(&self) -> usize {
self.num_elements
pub fn len(&self) -> usize {
match &self.storage {
TagValueSetStorage::Set(set) => set.len(),
TagValueSetStorage::Bloom(bloom) => bloom.count(),
}
}

pub fn insert(&mut self, value: TagValueSet) -> bool {
let inserted = match &mut self.storage {
TagValueSetStorage::Set(set) => set.insert(value),
pub fn insert(&mut self, value: TagValueSet) {
match &mut self.storage {
TagValueSetStorage::Set(set) => {
set.insert(value);
}
TagValueSetStorage::Bloom(bloom) => bloom.insert(&value),
};
if inserted {
self.num_elements += 1
}
inserted
}
}
4 changes: 2 additions & 2 deletions src/transforms/tag_cardinality_limit/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn make_metric(tags: MetricTags) -> Event {
}

const fn make_transform_hashset(
value_limit: u32,
value_limit: usize,
limit_exceeded_action: LimitExceededAction,
) -> TagCardinalityLimitConfig {
TagCardinalityLimitConfig {
Expand All @@ -46,7 +46,7 @@ const fn make_transform_hashset(
}

const fn make_transform_bloom(
value_limit: u32,
value_limit: usize,
limit_exceeded_action: LimitExceededAction,
) -> TagCardinalityLimitConfig {
TagCardinalityLimitConfig {
Expand Down

0 comments on commit d592b0c

Please sign in to comment.