Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Dec 20, 2024
1 parent 6e65051 commit 2cab4f3
Showing 1 changed file with 24 additions and 38 deletions.
62 changes: 24 additions & 38 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,38 +790,24 @@ struct SpansExtracted(bool);
#[derive(Debug)]
struct ProcessingResult {
managed_envelope: TypedEnvelope<Processed>,
partial: ProcessingPartialResult,
extracted_metrics: ProcessingExtractedMetrics,
}

impl ProcessingResult {
/// Creates a [`ProcessingResult`] with no metrics.
fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
Self {
managed_envelope,
partial: ProcessingPartialResult::NoMetrics,
extracted_metrics: ProcessingExtractedMetrics::new(),
}
}

/// Returns the components of the [`ProcessingResult`].
fn into_inner(self) -> (TypedEnvelope<Processed>, ExtractedMetrics) {
match self.partial {
ProcessingPartialResult::NoMetrics => (self.managed_envelope, Default::default()),
ProcessingPartialResult::WithMetrics { extracted_metrics } => {
(self.managed_envelope, extracted_metrics.metrics)
}
}
(self.managed_envelope, self.extracted_metrics.metrics)
}
}

/// The partial result of the envelope processing.
#[derive(Debug)]
enum ProcessingPartialResult {
NoMetrics,
WithMetrics {
extracted_metrics: ProcessingExtractedMetrics,
},
}

/// Response of the [`ProcessEnvelope`] message.
#[cfg_attr(not(feature = "processing"), allow(dead_code))]
pub struct ProcessEnvelopeResponse {
Expand Down Expand Up @@ -1522,7 +1508,7 @@ impl EnvelopeProcessorService {
project_info: Arc<ProjectInfo>,
sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<ProcessingPartialResult, ProcessingError> {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
let mut metrics = Metrics::default();
#[allow(unused_mut)]
Expand Down Expand Up @@ -1617,7 +1603,7 @@ impl EnvelopeProcessorService {
);
}

Ok(ProcessingPartialResult::WithMetrics { extracted_metrics })
Ok(Some(extracted_metrics))
}

/// Processes only transactions and transaction-related items.
Expand All @@ -1632,7 +1618,7 @@ impl EnvelopeProcessorService {
mut sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
reservoir_counters: ReservoirCounters,
) -> Result<ProcessingPartialResult, ProcessingError> {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
let mut event_metrics_extracted = EventMetricsExtracted(false);
let mut spans_extracted = SpansExtracted(false);
Expand Down Expand Up @@ -1763,7 +1749,7 @@ impl EnvelopeProcessorService {
)?;
});

return Ok(ProcessingPartialResult::WithMetrics { extracted_metrics });
return Ok(Some(extracted_metrics));
}

// Need to scrub the transaction before extracting spans.
Expand Down Expand Up @@ -1838,14 +1824,14 @@ impl EnvelopeProcessorService {
);
};

Ok(ProcessingPartialResult::WithMetrics { extracted_metrics })
Ok(Some(extracted_metrics))
}

fn process_profile_chunks(
&self,
managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
project_info: Arc<ProjectInfo>,
) -> Result<ProcessingPartialResult, ProcessingError> {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
profile_chunk::filter(managed_envelope, project_info.clone());
if_processing!(self.inner.config, {
profile_chunk::process(
Expand All @@ -1856,7 +1842,7 @@ impl EnvelopeProcessorService {
);
});

Ok(ProcessingPartialResult::NoMetrics)
Ok(None)
}

/// Processes standalone items that require an event ID, but do not have an event on the same envelope.
Expand All @@ -1867,7 +1853,7 @@ impl EnvelopeProcessorService {
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<ProcessingPartialResult, ProcessingError> {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();

Expand All @@ -1892,7 +1878,7 @@ impl EnvelopeProcessorService {
report::process_user_reports(managed_envelope);
attachment::scrub(managed_envelope, project_info);

Ok(ProcessingPartialResult::WithMetrics { extracted_metrics })
Ok(Some(extracted_metrics))
}

/// Processes user sessions.
Expand All @@ -1901,7 +1887,7 @@ impl EnvelopeProcessorService {
managed_envelope: &mut TypedEnvelope<SessionGroup>,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<ProcessingPartialResult, ProcessingError> {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
let mut extracted_metrics = ProcessingExtractedMetrics::new();

session::process(
Expand All @@ -1920,7 +1906,7 @@ impl EnvelopeProcessorService {
)?;
});

Ok(ProcessingPartialResult::WithMetrics { extracted_metrics })
Ok(Some(extracted_metrics))
}

/// Processes user and client reports.
Expand All @@ -1930,7 +1916,7 @@ impl EnvelopeProcessorService {
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<ProcessingPartialResult, ProcessingError> {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();

Expand All @@ -1951,7 +1937,7 @@ impl EnvelopeProcessorService {
self.inner.addrs.outcome_aggregator.clone(),
);

Ok(ProcessingPartialResult::WithMetrics { extracted_metrics })
Ok(Some(extracted_metrics))
}

/// Processes replays.
Expand All @@ -1961,7 +1947,7 @@ impl EnvelopeProcessorService {
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<ProcessingPartialResult, ProcessingError> {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();

Expand All @@ -1983,7 +1969,7 @@ impl EnvelopeProcessorService {
)?;
});

Ok(ProcessingPartialResult::WithMetrics { extracted_metrics })
Ok(Some(extracted_metrics))
}

/// Processes cron check-ins.
Expand All @@ -1993,7 +1979,7 @@ impl EnvelopeProcessorService {
#[allow(unused_variables)] project_id: ProjectId,
#[allow(unused_variables)] project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
) -> Result<ProcessingPartialResult, ProcessingError> {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();

Expand All @@ -2008,7 +1994,7 @@ impl EnvelopeProcessorService {
self.normalize_checkins(managed_envelope, project_id);
});

Ok(ProcessingPartialResult::WithMetrics { extracted_metrics })
Ok(Some(extracted_metrics))
}

/// Processes standalone spans.
Expand All @@ -2024,7 +2010,7 @@ impl EnvelopeProcessorService {
#[allow(unused_variables)] sampling_project_info: Option<Arc<ProjectInfo>>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
#[allow(unused_variables)] reservoir_counters: ReservoirCounters,
) -> Result<ProcessingPartialResult, ProcessingError> {
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();

Expand Down Expand Up @@ -2060,7 +2046,7 @@ impl EnvelopeProcessorService {
)?;
});

Ok(ProcessingPartialResult::WithMetrics { extracted_metrics })
Ok(Some(extracted_metrics))
}

fn process_envelope(
Expand Down Expand Up @@ -2104,9 +2090,9 @@ impl EnvelopeProcessorService {
($fn_name:ident $(, $args:expr)*) => {{
let mut managed_envelope = managed_envelope.try_into()?;
match self.$fn_name(&mut managed_envelope, $($args),*) {
Ok(partial_result) => Ok(ProcessingResult {
Ok(extracted_metrics) => Ok(ProcessingResult {
managed_envelope: managed_envelope.into_processed(),
partial: partial_result
extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
}),
Err(error) => {
if let Some(outcome) = error.to_outcome() {
Expand Down

0 comments on commit 2cab4f3

Please sign in to comment.