diff --git a/CHANGELOG.md b/CHANGELOG.md index 0393939e22..424e9bf856 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ **Features**: - Add data categories for Uptime and Attachment Items. ([#4363](https://github.com/getsentry/relay/pull/4363), [#4374](https://github.com/getsentry/relay/pull/4374)) +- Add ability to rate limit attachments by count (not just by the number of bytes). ([#4377](https://github.com/getsentry/relay/pull/4377)) ## 24.11.2 diff --git a/relay-quotas/src/quota.rs b/relay-quotas/src/quota.rs index 50bb5a554b..9a06ed1a72 100644 --- a/relay-quotas/src/quota.rs +++ b/relay-quotas/src/quota.rs @@ -170,10 +170,12 @@ impl ItemScoping<'_> { /// The unit in which a data category is measured. #[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum CategoryUnit { +pub enum CategoryUnit { + /// Counts the number of items. Count, + /// Counts the number of bytes across items. Bytes, - Batched, + /// Counts the accumulated times across items. Milliseconds, } @@ -199,9 +201,9 @@ impl CategoryUnit { | DataCategory::ProfileChunk | DataCategory::Uptime | DataCategory::MetricSecond - | DataCategory::AttachmentItem => Some(Self::Count), + | DataCategory::AttachmentItem + | DataCategory::Session => Some(Self::Count), DataCategory::Attachment => Some(Self::Bytes), - DataCategory::Session => Some(Self::Batched), DataCategory::ProfileDuration => Some(Self::Milliseconds), DataCategory::Unknown => None, diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 4a9eee87c4..e29444199c 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -49,7 +49,7 @@ use relay_quotas::DataCategory; use relay_sampling::DynamicSamplingContext; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; +use smallvec::{smallvec, SmallVec}; use crate::constants::DEFAULT_EVENT_RETENTION; use crate::extractors::{PartialMeta, RequestMeta}; @@ -632,6 +632,14 @@ pub struct Item { payload: Bytes, } +/// Expresses the purpose of counting quantities. +/// +/// Sessions are counted for rate limiting enforcement but not for outcome reporting. +pub enum CountFor { + RateLimits, + Outcomes, +} + impl Item { /// Creates a new item with the given type. pub fn new(ty: ItemType) -> Self { @@ -670,13 +678,38 @@ impl Item { /// Returns the number used for counting towards rate limits and producing outcomes. /// /// For attachments, we count the number of bytes. Other items are counted as 1. - pub fn quantity(&self) -> usize { + pub fn quantities(&self, purpose: CountFor) -> SmallVec<[(DataCategory, usize); 1]> { match self.ty() { - ItemType::Attachment => self.len().max(1), - // NOTE: This is semantically wrong. An otel trace contains may contain many spans, - // but we cannot easily count these before converting the trace into a series of spans. - ItemType::OtelTracesData => 1, - _ => 1, + ItemType::Event => smallvec![(DataCategory::Error, 1)], + ItemType::Transaction => smallvec![(DataCategory::Transaction, 1)], + ItemType::Security | ItemType::RawSecurity => { + smallvec![(DataCategory::Security, 1)] + } + ItemType::Nel => smallvec![], + ItemType::UnrealReport => smallvec![(DataCategory::Error, 1)], + ItemType::Attachment => smallvec![ + (DataCategory::Attachment, self.len().max(1)), + (DataCategory::AttachmentItem, 1) + ], + ItemType::Session | ItemType::Sessions => match purpose { + CountFor::RateLimits => smallvec![(DataCategory::Session, 1)], + CountFor::Outcomes => smallvec![], + }, + ItemType::Statsd | ItemType::MetricBuckets => smallvec![], + ItemType::FormData => smallvec![], + ItemType::UserReport => smallvec![], + ItemType::UserReportV2 => smallvec![(DataCategory::UserReportV2, 1)], + ItemType::Profile => smallvec![(DataCategory::Profile, 1)], + ItemType::ReplayEvent | ItemType::ReplayRecording | ItemType::ReplayVideo => { + smallvec![(DataCategory::Replay, 1)] + } + ItemType::ClientReport => smallvec![], + ItemType::CheckIn => smallvec![(DataCategory::Monitor, 1)], + ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)], + // NOTE: semantically wrong, but too expensive to parse. + ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)], + ItemType::ProfileChunk => smallvec![(DataCategory::ProfileChunk, 1)], // TODO: should be seconds? + ItemType::Unknown(_) => smallvec![], } } @@ -688,35 +721,6 @@ impl Item { ) } - /// Returns the data category used for generating outcomes. - /// - /// Returns `None` if outcomes are not generated for this type (e.g. sessions). - pub fn outcome_category(&self) -> Option { - match self.ty() { - ItemType::Event => Some(DataCategory::Error), - ItemType::Transaction => Some(DataCategory::Transaction), - ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security), - ItemType::Nel => None, - ItemType::UnrealReport => Some(DataCategory::Error), - ItemType::Attachment => Some(DataCategory::Attachment), - ItemType::Session | ItemType::Sessions => None, - ItemType::Statsd | ItemType::MetricBuckets => None, - ItemType::FormData => None, - ItemType::UserReport => None, - ItemType::UserReportV2 => Some(DataCategory::UserReportV2), - ItemType::Profile => Some(DataCategory::Profile), - ItemType::ReplayEvent | ItemType::ReplayRecording | ItemType::ReplayVideo => { - Some(DataCategory::Replay) - } - ItemType::ClientReport => None, - ItemType::CheckIn => Some(DataCategory::Monitor), - ItemType::Span | ItemType::OtelSpan => Some(DataCategory::Span), - ItemType::OtelTracesData => None, - ItemType::ProfileChunk => Some(DataCategory::ProfileChunk), - ItemType::Unknown(_) => None, - } - } - /// Returns `true` if this item's payload is empty. pub fn is_empty(&self) -> bool { self.payload.is_empty() diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index 2585a06ca6..5b45d864f3 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -11,7 +11,7 @@ use relay_sampling::config::RuleType; use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator}; use relay_sampling::{DynamicSamplingContext, SamplingConfig}; -use crate::envelope::ItemType; +use crate::envelope::{CountFor, ItemType}; use crate::services::outcome::Outcome; use crate::services::processor::{ EventProcessing, ProcessEnvelopeState, Sampling, TransactionGroup, @@ -116,18 +116,16 @@ pub fn drop_unsampled_items(state: &mut ProcessEnvelopeState, .take_items_by(|item| *item.ty() != ItemType::Profile); for item in dropped_items { - let Some(category) = item.outcome_category() else { - continue; - }; - - // Dynamic sampling only drops indexed items. Upgrade the category to the index - // category if one exists for this category, for example profiles will be upgraded to profiles indexed, - // but attachments are still emitted as attachments. - let category = category.index_category().unwrap_or(category); - - state - .managed_envelope - .track_outcome(outcome.clone(), category, item.quantity()); + for (category, quantity) in item.quantities(CountFor::Outcomes) { + // Dynamic sampling only drops indexed items. Upgrade the category to the index + // category if one exists for this category, for example profiles will be upgraded to profiles indexed, + // but attachments are still emitted as attachments. + let category = category.index_category().unwrap_or(category); + + state + .managed_envelope + .track_outcome(outcome.clone(), category, quantity); + } } // Mark all remaining items in the envelope as un-sampled. diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index ab51531b70..de1c6832dc 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc}; use relay_quotas::{DataCategory, Scoping}; use relay_system::Addr; -use crate::envelope::{Envelope, Item}; +use crate::envelope::{CountFor, Envelope, Item}; use crate::extractors::RequestMeta; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::{Processed, ProcessingGroup}; @@ -52,7 +52,6 @@ pub enum ItemAction { /// Keep the item. Keep, /// Drop the item and log an outcome for it. - /// The outcome will only be logged if the item has a corresponding [`Item::outcome_category()`]. Drop(Outcome), /// Drop the item without logging an outcome. DropSilently, @@ -263,12 +262,13 @@ impl ManagedEnvelope { ItemAction::Keep => true, ItemAction::DropSilently => false, ItemAction::Drop(outcome) => { - if let Some(category) = item.outcome_category() { + for (category, quantity) in item.quantities(CountFor::Outcomes) { if let Some(indexed) = category.index_category() { - outcomes.push((outcome.clone(), indexed, item.quantity())); + outcomes.push((outcome.clone(), indexed, quantity)); }; - outcomes.push((outcome, category, item.quantity())); - }; + outcomes.push((outcome.clone(), category, quantity)); + } + false } }); @@ -360,7 +360,7 @@ impl ManagedEnvelope { tags.has_transactions = summary.secondary_transaction_quantity > 0, tags.has_span_metrics = summary.secondary_span_quantity > 0, tags.has_replays = summary.replay_quantity > 0, - tags.has_checkins = summary.checkin_quantity > 0, + tags.has_checkins = summary.monitor_quantity > 0, tags.event_category = ?summary.event_category, cached_summary = ?summary, recomputed_summary = ?EnvelopeSummary::compute(self.envelope()), diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 28b90ba823..c2d1ac9854 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -5,7 +5,7 @@ use relay_quotas::{ ReasonCode, Scoping, }; -use crate::envelope::{Envelope, Item, ItemType}; +use crate::envelope::{CountFor, Envelope, Item, ItemType}; use crate::services::outcome::Outcome; use crate::utils::ManagedEnvelope; @@ -149,6 +149,9 @@ pub struct EnvelopeSummary { /// The quantity of all attachments combined in bytes. pub attachment_quantity: usize, + /// The number of attachments. + pub attachment_item_quantity: usize, + /// The number of all session updates. pub session_quantity: usize, @@ -159,7 +162,7 @@ pub struct EnvelopeSummary { pub replay_quantity: usize, /// The number of monitor check-ins. - pub checkin_quantity: usize, + pub monitor_quantity: usize, /// Secondary number of transactions. /// @@ -218,28 +221,29 @@ impl EnvelopeSummary { } summary.payload_size += item.len(); - summary.set_quantity(item); + for (category, quantity) in item.quantities(CountFor::RateLimits) { + summary.add_quantity(category, quantity); + } } summary } - fn set_quantity(&mut self, item: &Item) { - let target_quantity = match item.ty() { - ItemType::Attachment => &mut self.attachment_quantity, - ItemType::Session => &mut self.session_quantity, - ItemType::Profile => &mut self.profile_quantity, - ItemType::ReplayEvent => &mut self.replay_quantity, - ItemType::ReplayRecording => &mut self.replay_quantity, - ItemType::ReplayVideo => &mut self.replay_quantity, - ItemType::CheckIn => &mut self.checkin_quantity, - ItemType::OtelTracesData => &mut self.span_quantity, - ItemType::OtelSpan => &mut self.span_quantity, - ItemType::Span => &mut self.span_quantity, - ItemType::ProfileChunk => &mut self.profile_chunk_quantity, + fn add_quantity(&mut self, category: DataCategory, quantity: usize) { + let target_quantity = match category { + DataCategory::Attachment => &mut self.attachment_quantity, + DataCategory::AttachmentItem => &mut self.attachment_item_quantity, + DataCategory::Session => &mut self.session_quantity, + DataCategory::Profile => &mut self.profile_quantity, + DataCategory::Replay => &mut self.replay_quantity, + DataCategory::ReplayVideo => &mut self.replay_quantity, + DataCategory::Monitor => &mut self.monitor_quantity, + DataCategory::Span => &mut self.span_quantity, + DataCategory::ProfileChunk => &mut self.profile_chunk_quantity, + // TODO: This catch-all return looks dangerous _ => return, }; - *target_quantity += item.quantity(); + *target_quantity += quantity; } /// Infers the appropriate [`DataCategory`] for the envelope [`Item`]. @@ -326,8 +330,10 @@ pub struct Enforcement { pub event: CategoryLimit, /// The rate limit for the indexed category of the event. pub event_indexed: CategoryLimit, - /// The combined attachment item rate limit. + /// The combined attachment bytes rate limit. pub attachments: CategoryLimit, + /// The combined attachment item rate limit. + pub attachment_items: CategoryLimit, /// The combined session item rate limit. pub sessions: CategoryLimit, /// The combined profile item rate limit. @@ -373,6 +379,7 @@ impl Enforcement { event, event_indexed, attachments, + attachment_items, sessions: _, // Do not report outcomes for sessions. profiles, profiles_indexed, @@ -388,6 +395,7 @@ impl Enforcement { event, event_indexed, attachments, + attachment_items, profiles, profiles_indexed, replays, @@ -464,7 +472,7 @@ impl Enforcement { // to determine whether an item is limited. match item.ty() { ItemType::Attachment => { - if !self.attachments.is_active() { + if !(self.attachments.is_active() || self.attachment_items.is_active()) { return true; } if item.creates_event() { @@ -612,6 +620,7 @@ where let mut rate_limits = RateLimits::new(); let mut enforcement = Enforcement::default(); + // Handle event. if let Some(category) = summary.event_category { // Check the broad category for limits. let mut event_limits = self.check.apply(scoping.item(category), 1)?; @@ -631,19 +640,45 @@ where rate_limits.merge(event_limits); } + // Handle attachments. if let Some(limit) = enforcement.active_event() { - enforcement.attachments = - limit.clone_for(DataCategory::Attachment, summary.attachment_quantity); - } else if summary.attachment_quantity > 0 { - let item_scoping = scoping.item(DataCategory::Attachment); - let attachment_limits = self - .check - .apply(item_scoping, summary.attachment_quantity)?; - enforcement.attachments = CategoryLimit::new( - DataCategory::Attachment, - summary.attachment_quantity, - attachment_limits.longest(), + let limit1 = limit.clone_for(DataCategory::Attachment, summary.attachment_quantity); + let limit2 = limit.clone_for( + DataCategory::AttachmentItem, + summary.attachment_item_quantity, ); + enforcement.attachments = limit1; + enforcement.attachment_items = limit2; + } else { + let mut attachment_limits = RateLimits::new(); + if summary.attachment_quantity > 0 { + let item_scoping = scoping.item(DataCategory::Attachment); + + let attachment_byte_limits = self + .check + .apply(item_scoping, summary.attachment_quantity)?; + + enforcement.attachments = CategoryLimit::new( + DataCategory::Attachment, + summary.attachment_quantity, + attachment_byte_limits.longest(), + ); + attachment_limits.merge(attachment_byte_limits); + } + if !attachment_limits.is_limited() && summary.attachment_item_quantity > 0 { + let item_scoping = scoping.item(DataCategory::AttachmentItem); + + let attachment_item_limits = self + .check + .apply(item_scoping, summary.attachment_item_quantity)?; + + enforcement.attachment_items = CategoryLimit::new( + DataCategory::AttachmentItem, + summary.attachment_item_quantity, + attachment_item_limits.longest(), + ); + attachment_limits.merge(attachment_item_limits); + } // Only record rate limits for plain attachments. For all other attachments, it's // perfectly "legal" to send them. They will still be discarded in Sentry, but clients @@ -653,6 +688,7 @@ where } } + // Handle sessions. if summary.session_quantity > 0 { let item_scoping = scoping.item(DataCategory::Session); let session_limits = self.check.apply(item_scoping, summary.session_quantity)?; @@ -664,6 +700,7 @@ where rate_limits.merge(session_limits); } + // Handle profiles. if enforcement.is_event_active() { enforcement.profiles = enforcement .event @@ -708,6 +745,7 @@ where rate_limits.merge(profile_limits); } + // Handle replays. if summary.replay_quantity > 0 { let item_scoping = scoping.item(DataCategory::Replay); let replay_limits = self.check.apply(item_scoping, summary.replay_quantity)?; @@ -719,17 +757,19 @@ where rate_limits.merge(replay_limits); } - if summary.checkin_quantity > 0 { + // Handle monitor checkins. + if summary.monitor_quantity > 0 { let item_scoping = scoping.item(DataCategory::Monitor); - let checkin_limits = self.check.apply(item_scoping, summary.checkin_quantity)?; + let checkin_limits = self.check.apply(item_scoping, summary.monitor_quantity)?; enforcement.check_ins = CategoryLimit::new( DataCategory::Monitor, - summary.checkin_quantity, + summary.monitor_quantity, checkin_limits.longest(), ); rate_limits.merge(checkin_limits); } + // Handle spans. if enforcement.is_event_active() { enforcement.spans = enforcement .event @@ -764,6 +804,7 @@ where rate_limits.merge(span_limits); } + // Handle profile chunks. if summary.profile_chunk_quantity > 0 { let item_scoping = scoping.item(DataCategory::ProfileChunk); let profile_chunk_limits = self @@ -1468,6 +1509,7 @@ mod tests { assert!(!enforcement.event.is_active()); assert!(enforcement.event_indexed.is_active()); assert!(enforcement.attachments.is_active()); + assert!(enforcement.attachment_items.is_active()); mock.assert_call(DataCategory::Transaction, 1); mock.assert_call(DataCategory::TransactionIndexed, 1); @@ -1475,7 +1517,8 @@ mod tests { get_outcomes(enforcement), vec![ (DataCategory::TransactionIndexed, 1), - (DataCategory::Attachment, 10) + (DataCategory::Attachment, 10), + (DataCategory::AttachmentItem, 1) ] ); } diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 44985358d7..98efbefee3 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -268,11 +268,11 @@ def assert_rate_limited( assert outcome["outcome"] == 2, outcome assert outcome["reason"] == reason, outcome["reason"] if key_id is not None: - assert outcome["key_id"] == key_id + assert outcome["key_id"] == key_id, (outcome["key_id"], key_id) if quantity is not None: count = sum(outcome["quantity"] for outcome in outcomes) - assert count == quantity + assert count == quantity, (count, quantity) @pytest.fixture diff --git a/tests/integration/test_attachments.py b/tests/integration/test_attachments.py index eb4071b85b..408bde93e5 100644 --- a/tests/integration/test_attachments.py +++ b/tests/integration/test_attachments.py @@ -117,7 +117,7 @@ def test_mixed_attachments_with_processing( } -@pytest.mark.parametrize("rate_limits", [[], ["attachment"]]) +@pytest.mark.parametrize("rate_limits", [[], ["attachment"], ["attachment_item"]]) def test_attachments_ratelimit( mini_sentry, relay_with_processing, outcomes_consumer, rate_limits ): @@ -131,7 +131,7 @@ def test_attachments_ratelimit( relay = relay_with_processing() outcomes_consumer = outcomes_consumer() - attachments = [("att_1", "foo.txt", b"")] + attachments = [("att_1", "foo.txt", b"this is an attachment")] # First attachment returns 200 but is rate limited in processing relay.send_attachments(project_id, event_id, attachments) @@ -182,13 +182,68 @@ def test_attachments_quotas( # First attachment returns 200 but is rate limited in processing relay.send_attachments(project_id, event_id, attachments) - outcomes_consumer.assert_rate_limited("attachments_exceeded") + outcomes_consumer.assert_rate_limited( + "attachments_exceeded", quantity=len(attachment_body) + ) + + # Second attachment returns 429 in endpoint + with pytest.raises(HTTPError) as excinfo: + relay.send_attachments(42, event_id, attachments) + assert excinfo.value.response.status_code == 429 + outcomes_consumer.assert_rate_limited( + "attachments_exceeded", quantity=len(attachment_body) + ) + + +def test_attachments_quotas_items( + mini_sentry, + relay_with_processing, + attachments_consumer, + outcomes_consumer, +): + event_id = "515539018c9b4260a6f999572f1661ee" + attachment_body = b"blabla" + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["quotas"] = [ + { + "id": f"test_rate_limiting_{uuid.uuid4().hex}", + "categories": ["attachment_item"], + "window": 3600, + "limit": 5, + "reasonCode": "attachments_exceeded", + } + ] + relay = relay_with_processing() + + attachments_consumer = attachments_consumer() + outcomes_consumer = outcomes_consumer() + attachments = [("att_1", "foo.txt", attachment_body)] + + for i in range(5): + relay.send_attachments( + project_id, event_id, [("att_1", "%s.txt" % i, attachment_body)] + ) + attachment = attachments_consumer.get_individual_attachment() + assert attachment["attachment"]["name"] == "%s.txt" % i + assert attachment["attachment"]["data"] == attachment_body + + # First attachment returns 200 but is rate limited in processing + relay.send_attachments(project_id, event_id, attachments) + + outcomes_consumer.assert_rate_limited( + "attachments_exceeded", categories=["attachment_item"], quantity=1 + ) # Second attachment returns 429 in endpoint with pytest.raises(HTTPError) as excinfo: relay.send_attachments(42, event_id, attachments) assert excinfo.value.response.status_code == 429 - outcomes_consumer.assert_rate_limited("attachments_exceeded") + + outcomes_consumer.assert_rate_limited( + "attachments_exceeded", categories=["attachment_item"], quantity=1 + ) def test_view_hierarchy_processing( diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index 3033ed69c8..2d19b6d90b 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -528,13 +528,13 @@ def test_minidump_ratelimit( # First minidump returns 200 but is rate limited in processing relay.send_minidump(project_id=project_id, files=attachments) outcomes_consumer.assert_rate_limited( - "static_disabled_quota", categories=["error", "attachment"] + "static_disabled_quota", categories=["error", "attachment", "attachment_item"] ) # Minidumps never return rate limits relay.send_minidump(project_id=project_id, files=attachments) outcomes_consumer.assert_rate_limited( - "static_disabled_quota", categories=["error", "attachment"] + "static_disabled_quota", categories=["error", "attachment", "attachment_item"] ) diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 818ebf661a..216937adc0 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -1288,6 +1288,16 @@ def make_envelope(transaction_name): "reason": "Sampled:3000", "source": expected_source, }, + { + "category": 22, # attachment item + "key_id": 123, + "org_id": 1, + "outcome": 1, + "project_id": 42, + "quantity": 1, # number of attachments + "reason": "Sampled:3000", + "source": expected_source, + }, ] for outcome in outcomes: outcome.pop("timestamp")