-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
sink.rs
83 lines (76 loc) · 2.7 KB
/
sink.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::fmt;
use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner};
pub struct AzureBlobSink<Svc, RB> {
service: Svc,
request_builder: RB,
partitioner: KeyPartitioner,
batcher_settings: BatcherSettings,
}
impl<Svc, RB> AzureBlobSink<Svc, RB> {
pub const fn new(
service: Svc,
request_builder: RB,
partitioner: KeyPartitioner,
batcher_settings: BatcherSettings,
) -> Self {
Self {
service,
request_builder,
partitioner,
batcher_settings,
}
}
}
impl<Svc, RB> AzureBlobSink<Svc, RB>
where
Svc: Service<RB::Request> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Response: DriverResponse + Send + 'static,
Svc::Error: fmt::Debug + Into<crate::Error> + Send,
RB: RequestBuilder<(String, Vec<Event>)> + Send + Sync + 'static,
RB::Error: fmt::Display + Send,
RB::Request: Finalizable + MetaDescriptive + Send,
{
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let partitioner = self.partitioner;
let settings = self.batcher_settings;
let request_builder = self.request_builder;
input
.batched_partitioned(partitioner, || settings.as_byte_size_config())
.filter_map(|(key, batch)| async move {
// We don't need to emit an error here if the event is dropped since this will occur if the template
// couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when
// that occurs.
key.map(move |k| (k, batch))
})
.request_builder(default_request_builder_concurrency_limit(), request_builder)
.filter_map(|request| async move {
match request {
Err(error) => {
emit!(SinkRequestBuildError { error });
None
}
Ok(req) => Some(req),
}
})
.into_driver(self.service)
.protocol("https")
.run()
.await
}
}
#[async_trait]
impl<Svc, RB> StreamSink<Event> for AzureBlobSink<Svc, RB>
where
Svc: Service<RB::Request> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Response: DriverResponse + Send + 'static,
Svc::Error: fmt::Debug + Into<crate::Error> + Send,
RB: RequestBuilder<(String, Vec<Event>)> + Send + Sync + 'static,
RB::Error: fmt::Display + Send,
RB::Request: Finalizable + MetaDescriptive + Send,
{
async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
self.run_inner(input).await
}
}