Skip to content

Commit

Permalink
enhancement(pulsar sink): Refactor to use StreamSink
Browse files Browse the repository at this point in the history
This commit heavily refactors the Pulsar Sink to use the StreamSink
interface and is modeled after the Kafka Sink.

It also adds additional features that bring it in line with
Kafka Sink feature set.

This includes:
* Refactoring to use StreamSink instead of Sink interace. See #9261
* Supports dynamic topics using a topic template
* Refactor configurations in advance of adding Pulsar source
* Rework message parsing to support logs and metrics, with support for
  dynamic keys and properties

This work is heavily modeled after Kafka sink. This means there has been
some duplication of some utility code. However, it has not been
refactored to remove the duplication as there wasn't a clear pattern of
where such shared code should be put.

Additionally, this refactor seems to be much simpler by using StreamSink
but does require some workarounds limitations in the Pulsar client
library by wrapping certain resources in Arc<Mutex> that *may* have
performance implications. I am not famaliar enough to know if there
might be some efficiencies by structuring this differently.

Remaining work:
* Add a few more tests
  • Loading branch information
addisonj committed Sep 9, 2022
1 parent e9c79f9 commit 08688d9
Show file tree
Hide file tree
Showing 10 changed files with 783 additions and 455 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["dep:prometheus-parser", "dep:snap", "dep:serde_with"]
sinks-pulsar = ["dep:avro-rs", "dep:pulsar"]
sinks-pulsar = ["dep:avro-rs", "dep:pulsar", "dep:lru"]
sinks-redis = ["dep:redis"]
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
sinks-socket = ["sinks-utils-udp"]
Expand Down
22 changes: 22 additions & 0 deletions src/internal_events/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,25 @@ impl InternalEvent for PulsarSendingError {
});
}
}

pub struct PulsarPropertyExtractionError<'a> {
pub property_field: &'a str,
}

impl InternalEvent for PulsarPropertyExtractionError<'_> {
fn emit(self) {
error!(
message = "Failed to extract properties. Value should be a map of String -> Bytes.",
error_code = "extracing_property",
error_type = error_type::PARSER_FAILED,
stage = error_stage::RECEIVING,
property_field = self.property_field,
);
counter!(
"component_errors_total", 1,
"error_code" => "extracing_property",
"error_type" => error_type::PARSER_FAILED,
"stage" => error_stage::RECEIVING,
);
}
}
2 changes: 1 addition & 1 deletion src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ pub enum Sinks {

/// Apache Pulsar.
#[cfg(feature = "sinks-pulsar")]
Pulsar(#[configurable(derived)] pulsar::PulsarSinkConfig),
Pulsar(#[configurable(derived)] pulsar::config::PulsarSinkConfig),

/// Redis.
#[cfg(feature = "sinks-redis")]
Expand Down
Loading

0 comments on commit 08688d9

Please sign in to comment.