Skip to content

Commit

Permalink
feat(quota): Rate limit attachments by item count (#4377)
Browse files Browse the repository at this point in the history
Attachment quotas and rate limits are currently defined in bytes, so we
have no way to prevent an abusively high number of very small (or even
empty) attachments.
  • Loading branch information
jjbayer authored Dec 16, 2024
1 parent 49b6ef4 commit a8305c8
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 102 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
**Features**:

- Add data categories for Uptime and Attachment Items. ([#4363](https://github.com/getsentry/relay/pull/4363), [#4374](https://github.com/getsentry/relay/pull/4374))
- Add ability to rate limit attachments by count (not just by the number of bytes). ([#4377](https://github.com/getsentry/relay/pull/4377))

## 24.11.2

Expand Down
10 changes: 6 additions & 4 deletions relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ impl ItemScoping<'_> {

/// The unit in which a data category is measured.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum CategoryUnit {
pub enum CategoryUnit {
/// Counts the number of items.
Count,
/// Counts the number of bytes across items.
Bytes,
Batched,
/// Counts the accumulated times across items.
Milliseconds,
}

Expand All @@ -199,9 +201,9 @@ impl CategoryUnit {
| DataCategory::ProfileChunk
| DataCategory::Uptime
| DataCategory::MetricSecond
| DataCategory::AttachmentItem => Some(Self::Count),
| DataCategory::AttachmentItem
| DataCategory::Session => Some(Self::Count),
DataCategory::Attachment => Some(Self::Bytes),
DataCategory::Session => Some(Self::Batched),
DataCategory::ProfileDuration => Some(Self::Milliseconds),

DataCategory::Unknown => None,
Expand Down
76 changes: 40 additions & 36 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use relay_quotas::DataCategory;
use relay_sampling::DynamicSamplingContext;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use smallvec::{smallvec, SmallVec};

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::extractors::{PartialMeta, RequestMeta};
Expand Down Expand Up @@ -632,6 +632,14 @@ pub struct Item {
payload: Bytes,
}

/// Expresses the purpose of counting quantities.
///
/// Sessions are counted for rate limiting enforcement but not for outcome reporting.
pub enum CountFor {
RateLimits,
Outcomes,
}

impl Item {
/// Creates a new item with the given type.
pub fn new(ty: ItemType) -> Self {
Expand Down Expand Up @@ -670,13 +678,38 @@ impl Item {
/// Returns the number used for counting towards rate limits and producing outcomes.
///
/// For attachments, we count the number of bytes. Other items are counted as 1.
pub fn quantity(&self) -> usize {
pub fn quantities(&self, purpose: CountFor) -> SmallVec<[(DataCategory, usize); 1]> {
match self.ty() {
ItemType::Attachment => self.len().max(1),
// NOTE: This is semantically wrong. An otel trace contains may contain many spans,
// but we cannot easily count these before converting the trace into a series of spans.
ItemType::OtelTracesData => 1,
_ => 1,
ItemType::Event => smallvec![(DataCategory::Error, 1)],
ItemType::Transaction => smallvec![(DataCategory::Transaction, 1)],
ItemType::Security | ItemType::RawSecurity => {
smallvec![(DataCategory::Security, 1)]
}
ItemType::Nel => smallvec![],
ItemType::UnrealReport => smallvec![(DataCategory::Error, 1)],
ItemType::Attachment => smallvec![
(DataCategory::Attachment, self.len().max(1)),
(DataCategory::AttachmentItem, 1)
],
ItemType::Session | ItemType::Sessions => match purpose {
CountFor::RateLimits => smallvec![(DataCategory::Session, 1)],
CountFor::Outcomes => smallvec![],
},
ItemType::Statsd | ItemType::MetricBuckets => smallvec![],
ItemType::FormData => smallvec![],
ItemType::UserReport => smallvec![],
ItemType::UserReportV2 => smallvec![(DataCategory::UserReportV2, 1)],
ItemType::Profile => smallvec![(DataCategory::Profile, 1)],
ItemType::ReplayEvent | ItemType::ReplayRecording | ItemType::ReplayVideo => {
smallvec![(DataCategory::Replay, 1)]
}
ItemType::ClientReport => smallvec![],
ItemType::CheckIn => smallvec![(DataCategory::Monitor, 1)],
ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)],
// NOTE: semantically wrong, but too expensive to parse.
ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)],
ItemType::ProfileChunk => smallvec![(DataCategory::ProfileChunk, 1)], // TODO: should be seconds?
ItemType::Unknown(_) => smallvec![],
}
}

Expand All @@ -688,35 +721,6 @@ impl Item {
)
}

/// Returns the data category used for generating outcomes.
///
/// Returns `None` if outcomes are not generated for this type (e.g. sessions).
pub fn outcome_category(&self) -> Option<DataCategory> {
match self.ty() {
ItemType::Event => Some(DataCategory::Error),
ItemType::Transaction => Some(DataCategory::Transaction),
ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
ItemType::Nel => None,
ItemType::UnrealReport => Some(DataCategory::Error),
ItemType::Attachment => Some(DataCategory::Attachment),
ItemType::Session | ItemType::Sessions => None,
ItemType::Statsd | ItemType::MetricBuckets => None,
ItemType::FormData => None,
ItemType::UserReport => None,
ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
ItemType::Profile => Some(DataCategory::Profile),
ItemType::ReplayEvent | ItemType::ReplayRecording | ItemType::ReplayVideo => {
Some(DataCategory::Replay)
}
ItemType::ClientReport => None,
ItemType::CheckIn => Some(DataCategory::Monitor),
ItemType::Span | ItemType::OtelSpan => Some(DataCategory::Span),
ItemType::OtelTracesData => None,
ItemType::ProfileChunk => Some(DataCategory::ProfileChunk),
ItemType::Unknown(_) => None,
}
}

/// Returns `true` if this item's payload is empty.
pub fn is_empty(&self) -> bool {
self.payload.is_empty()
Expand Down
24 changes: 11 additions & 13 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use relay_sampling::config::RuleType;
use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
use relay_sampling::{DynamicSamplingContext, SamplingConfig};

use crate::envelope::ItemType;
use crate::envelope::{CountFor, ItemType};
use crate::services::outcome::Outcome;
use crate::services::processor::{
EventProcessing, ProcessEnvelopeState, Sampling, TransactionGroup,
Expand Down Expand Up @@ -116,18 +116,16 @@ pub fn drop_unsampled_items(state: &mut ProcessEnvelopeState<TransactionGroup>,
.take_items_by(|item| *item.ty() != ItemType::Profile);

for item in dropped_items {
let Some(category) = item.outcome_category() else {
continue;
};

// Dynamic sampling only drops indexed items. Upgrade the category to the index
// category if one exists for this category, for example profiles will be upgraded to profiles indexed,
// but attachments are still emitted as attachments.
let category = category.index_category().unwrap_or(category);

state
.managed_envelope
.track_outcome(outcome.clone(), category, item.quantity());
for (category, quantity) in item.quantities(CountFor::Outcomes) {
// Dynamic sampling only drops indexed items. Upgrade the category to the index
// category if one exists for this category, for example profiles will be upgraded to profiles indexed,
// but attachments are still emitted as attachments.
let category = category.index_category().unwrap_or(category);

state
.managed_envelope
.track_outcome(outcome.clone(), category, quantity);
}
}

// Mark all remaining items in the envelope as un-sampled.
Expand Down
14 changes: 7 additions & 7 deletions relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc};
use relay_quotas::{DataCategory, Scoping};
use relay_system::Addr;

use crate::envelope::{Envelope, Item};
use crate::envelope::{CountFor, Envelope, Item};
use crate::extractors::RequestMeta;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::{Processed, ProcessingGroup};
Expand Down Expand Up @@ -52,7 +52,6 @@ pub enum ItemAction {
/// Keep the item.
Keep,
/// Drop the item and log an outcome for it.
/// The outcome will only be logged if the item has a corresponding [`Item::outcome_category()`].
Drop(Outcome),
/// Drop the item without logging an outcome.
DropSilently,
Expand Down Expand Up @@ -263,12 +262,13 @@ impl ManagedEnvelope {
ItemAction::Keep => true,
ItemAction::DropSilently => false,
ItemAction::Drop(outcome) => {
if let Some(category) = item.outcome_category() {
for (category, quantity) in item.quantities(CountFor::Outcomes) {
if let Some(indexed) = category.index_category() {
outcomes.push((outcome.clone(), indexed, item.quantity()));
outcomes.push((outcome.clone(), indexed, quantity));
};
outcomes.push((outcome, category, item.quantity()));
};
outcomes.push((outcome.clone(), category, quantity));
}

false
}
});
Expand Down Expand Up @@ -360,7 +360,7 @@ impl ManagedEnvelope {
tags.has_transactions = summary.secondary_transaction_quantity > 0,
tags.has_span_metrics = summary.secondary_span_quantity > 0,
tags.has_replays = summary.replay_quantity > 0,
tags.has_checkins = summary.checkin_quantity > 0,
tags.has_checkins = summary.monitor_quantity > 0,
tags.event_category = ?summary.event_category,
cached_summary = ?summary,
recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
Expand Down
Loading

0 comments on commit a8305c8

Please sign in to comment.