Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(processor): Remove event_fully_normalized from state #4371

Merged
merged 7 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 49 additions & 29 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,12 +790,6 @@ struct ProcessEnvelopeState<'a, Group> {

/// Reservoir evaluator that we use for dynamic sampling.
reservoir: ReservoirEvaluator<'a>,

/// Track whether the event has already been fully normalized.
///
/// If the processing pipeline applies changes to the event, it should
/// disable this flag to ensure the event is always normalized.
event_fully_normalized: bool,
}

impl<Group> ProcessEnvelopeState<'_, Group> {
Expand Down Expand Up @@ -1157,6 +1151,22 @@ struct InnerProcessor {
metric_outcomes: MetricOutcomes,
}

/// New type representing the normalization state of the event.
#[derive(Copy, Clone)]
struct EventFullyNormalized(pub bool);

impl EventFullyNormalized {
/// Returns `true` if the event is fully normalized, `false` otherwise.
pub fn new(envelope: &Envelope) -> Self {
let event_fully_normalized = envelope.meta().is_from_internal_relay()
&& envelope
.items()
.any(|item| item.creates_event() && item.fully_normalized());

Self(event_fully_normalized)
}
}

impl EnvelopeProcessorService {
/// Creates a multi-threaded envelope processor.
#[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
Expand Down Expand Up @@ -1277,12 +1287,6 @@ impl EnvelopeProcessorService {
// 2. The DSN was moved and the envelope sent to the old project ID.
envelope.meta_mut().set_project_id(project_id);

// Only trust item headers in envelopes coming from internal relays
let event_fully_normalized = envelope.meta().is_from_internal_relay()
&& envelope
.items()
.any(|item| item.creates_event() && item.fully_normalized());

#[allow(unused_mut)]
let mut reservoir = ReservoirEvaluator::new(reservoir_counters);
#[cfg(feature = "processing")]
Expand All @@ -1306,7 +1310,6 @@ impl EnvelopeProcessorService {
project_id,
managed_envelope,
reservoir,
event_fully_normalized,
}
}

Expand Down Expand Up @@ -1461,20 +1464,21 @@ impl EnvelopeProcessorService {
fn normalize_event<G: EventProcessing>(
&self,
state: &mut ProcessEnvelopeState<G>,
) -> Result<(), ProcessingError> {
mut event_fully_normalized: EventFullyNormalized,
) -> Result<Option<EventFullyNormalized>, ProcessingError> {
if !state.has_event() {
// NOTE(iker): only processing relays create events from
// attachments, so these events won't be normalized in
// non-processing relays even if the config is set to run full
// normalization.
return Ok(());
return Ok(None);
}

let full_normalization = match self.inner.config.normalization_level() {
NormalizationLevel::Full => true,
NormalizationLevel::Default => {
if self.inner.config.processing_enabled() && state.event_fully_normalized {
return Ok(());
if self.inner.config.processing_enabled() && event_fully_normalized.0 {
return Ok(None);
}

self.inner.config.processing_enabled()
Expand Down Expand Up @@ -1597,32 +1601,42 @@ impl EnvelopeProcessorService {
})
})?;

state.event_fully_normalized |= full_normalization;
event_fully_normalized.0 |= full_normalization;

Ok(())
Ok(Some(event_fully_normalized))
}

/// Processes the general errors, and the items which require or create the events.
fn process_errors(
&self,
state: &mut ProcessEnvelopeState<ErrorGroup>,
) -> Result<(), ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());

// Events can also contain user reports.
report::process_user_reports(state);

if_processing!(self.inner.config, {
unreal::expand(state, &self.inner.config)?;
});

event::extract(state, &self.inner.config)?;
event::extract(state, event_fully_normalized, &self.inner.config)?;

if_processing!(self.inner.config, {
unreal::process(state)?;
attachment::create_placeholders(state);
if let Some(inner_event_fully_normalized) = unreal::process(state)? {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very ugly, but we have to have a tristate, since we need to represent the fact of not updating the event normalization boolean.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be less awkward with an enum that encodes the tristate. Then you could do something like event_fully_normalized.update(unreal::process(state)?);

event_fully_normalized = inner_event_fully_normalized;
}
if let Some(inner_event_fully_normalized) = attachment::create_placeholders(state) {
event_fully_normalized = inner_event_fully_normalized;
}
});

event::finalize(state, &self.inner.config)?;
self.normalize_event(state)?;
if let Some(inner_event_fully_normalized) =
self.normalize_event(state, event_fully_normalized)?
{
event_fully_normalized = inner_event_fully_normalized;
};
let filter_run = event::filter(state, &self.inner.global_config.current())?;

if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
Expand All @@ -1635,13 +1649,13 @@ impl EnvelopeProcessorService {

if state.has_event() {
event::scrub(state)?;
event::serialize(state)?;
event::serialize(state, event_fully_normalized)?;
event::emit_feedback_metrics(state.envelope());
}

attachment::scrub(state);

if self.inner.config.processing_enabled() && !state.event_fully_normalized {
if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
relay_log::error!(
tags.project = %state.project_id,
tags.ty = state.event_type().map(|e| e.to_string()).unwrap_or("none".to_owned()),
Expand All @@ -1657,15 +1671,21 @@ impl EnvelopeProcessorService {
&self,
state: &mut ProcessEnvelopeState<TransactionGroup>,
) -> Result<(), ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());

let global_config = self.inner.global_config.current();

event::extract(state, &self.inner.config)?;
event::extract(state, event_fully_normalized, &self.inner.config)?;

let profile_id = profile::filter(state);
profile::transfer_id(state, profile_id);

event::finalize(state, &self.inner.config)?;
self.normalize_event(state)?;
if let Some(inner_event_fully_normalized) =
self.normalize_event(state, event_fully_normalized)?
{
event_fully_normalized = inner_event_fully_normalized;
}

dynamic_sampling::ensure_dsc(state);

Expand Down Expand Up @@ -1735,10 +1755,10 @@ impl EnvelopeProcessorService {

// Event may have been dropped because of a quota and the envelope can be empty.
if state.has_event() {
event::serialize(state)?;
event::serialize(state, event_fully_normalized)?;
}

if self.inner.config.processing_enabled() && !state.event_fully_normalized {
if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
relay_log::error!(
tags.project = %state.project_id,
tags.ty = state.event_type().map(|e| e.to_string()).unwrap_or("none".to_owned()),
Expand Down
14 changes: 9 additions & 5 deletions relay-server/src/services/processor/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::statsd::RelayTimers;

#[cfg(feature = "processing")]
use {
crate::services::processor::ErrorGroup, crate::utils, relay_event_schema::protocol::Event,
relay_protocol::Annotated,
crate::services::processor::ErrorGroup, crate::services::processor::EventFullyNormalized,
crate::utils, relay_event_schema::protocol::Event, relay_protocol::Annotated,
};

/// Adds processing placeholders for special attachments.
Expand All @@ -23,7 +23,9 @@ use {
///
/// If the event payload was empty before, it is created.
#[cfg(feature = "processing")]
pub fn create_placeholders(state: &mut ProcessEnvelopeState<ErrorGroup>) {
pub fn create_placeholders(
state: &mut ProcessEnvelopeState<ErrorGroup>,
) -> Option<EventFullyNormalized> {
let envelope = state.managed_envelope.envelope();
let minidump_attachment =
envelope.get_item_by(|item| item.attachment_type() == Some(&AttachmentType::Minidump));
Expand All @@ -34,13 +36,15 @@ pub fn create_placeholders(state: &mut ProcessEnvelopeState<ErrorGroup>) {
let event = state.event.get_or_insert_with(Event::default);
state.metrics.bytes_ingested_event_minidump = Annotated::new(item.len() as u64);
utils::process_minidump(event, &item.payload());
state.event_fully_normalized = false;
return Some(EventFullyNormalized(false));
} else if let Some(item) = apple_crash_report_attachment {
let event = state.event.get_or_insert_with(Event::default);
state.metrics.bytes_ingested_event_applecrashreport = Annotated::new(item.len() as u64);
utils::process_apple_crash_report(event, &item.payload());
state.event_fully_normalized = false;
return Some(EventFullyNormalized(false));
}

None
}

/// Apply data privacy rules to attachments in the envelope.
Expand Down
2 changes: 0 additions & 2 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ mod tests {
event_metrics_extracted: false,
reservoir: dummy_reservoir(),
spans_extracted: false,
event_fully_normalized: false,
}
};

Expand Down Expand Up @@ -752,7 +751,6 @@ mod tests {
.try_into()
.unwrap(),
reservoir: dummy_reservoir(),
event_fully_normalized: false,
};

run(&mut state)
Expand Down
10 changes: 6 additions & 4 deletions relay-server/src/services/processor/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::extractors::RequestMeta;
use crate::services::outcome::Outcome;
use crate::services::processor::{
EventProcessing, ExtractedEvent, ProcessEnvelopeState, ProcessingError, MINIMUM_CLOCK_DRIFT,
EventFullyNormalized, EventProcessing, ExtractedEvent, ProcessEnvelopeState, ProcessingError,
MINIMUM_CLOCK_DRIFT,
};
use crate::statsd::{PlatformTag, RelayCounters, RelayHistograms, RelayTimers};
use crate::utils::{self, ChunkedFormDataAggregator, FormDataIter};
Expand All @@ -39,9 +40,9 @@ use crate::utils::{self, ChunkedFormDataAggregator, FormDataIter};
/// 5. If none match, `Annotated::empty()`.
pub fn extract<G: EventProcessing>(
state: &mut ProcessEnvelopeState<G>,
event_fully_normalized: EventFullyNormalized,
config: &Config,
) -> Result<(), ProcessingError> {
let event_fully_normalized = state.event_fully_normalized;
let envelope = &mut state.envelope_mut();

// Remove all items first, and then process them. After this function returns, only
Expand All @@ -68,7 +69,7 @@ pub fn extract<G: EventProcessing>(
return Err(ProcessingError::DuplicateItem(duplicate.ty().clone()));
}

let skip_normalization = config.processing_enabled() && event_fully_normalized;
let skip_normalization = config.processing_enabled() && event_fully_normalized.0;

let (event, event_len) = if let Some(item) = event_item.or(security_item) {
relay_log::trace!("processing json event");
Expand Down Expand Up @@ -347,6 +348,7 @@ pub fn scrub<G: EventProcessing>(

pub fn serialize<G: EventProcessing>(
state: &mut ProcessEnvelopeState<G>,
event_fully_normalized: EventFullyNormalized,
) -> Result<(), ProcessingError> {
if state.event.is_empty() {
relay_log::error!("Cannot serialize empty event");
Expand All @@ -368,7 +370,7 @@ pub fn serialize<G: EventProcessing>(
// If transaction metrics were extracted, set the corresponding item header
event_item.set_metrics_extracted(state.event_metrics_extracted);
event_item.set_spans_extracted(state.spans_extracted);
event_item.set_fully_normalized(state.event_fully_normalized);
event_item.set_fully_normalized(event_fully_normalized.0);

state.envelope_mut().add_item(event_item);

Expand Down
1 change: 0 additions & 1 deletion relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,6 @@ mod tests {
event_metrics_extracted: false,
spans_extracted: false,
reservoir: ReservoirEvaluator::new(ReservoirCounters::default()),
event_fully_normalized: false,
}
}

Expand Down
13 changes: 9 additions & 4 deletions relay-server/src/services/processor/unreal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
use relay_config::Config;

use crate::envelope::ItemType;
use crate::services::processor::{ErrorGroup, ProcessEnvelopeState, ProcessingError};
use crate::services::processor::{
ErrorGroup, EventFullyNormalized, ProcessEnvelopeState, ProcessingError,
};
use crate::utils;

/// Expands Unreal 4 items inside an envelope.
Expand Down Expand Up @@ -37,11 +39,14 @@ pub fn expand(
///
/// If the event does not contain an unreal context, this function does not perform any action.
/// If there was no event payload prior to this function, it is created.
pub fn process(state: &mut ProcessEnvelopeState<ErrorGroup>) -> Result<(), ProcessingError> {
pub fn process(
state: &mut ProcessEnvelopeState<ErrorGroup>,
) -> Result<Option<EventFullyNormalized>, ProcessingError> {
if utils::process_unreal_envelope(&mut state.event, state.managed_envelope.envelope_mut())
.map_err(ProcessingError::InvalidUnrealReport)?
{
state.event_fully_normalized = false;
return Ok(Some(EventFullyNormalized(false)));
}
Ok(())

Ok(None)
}
Loading