Skip to content

Commit

Permalink
kafka: support prometheus address in config
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Oct 9, 2023
1 parent 29b8cd5 commit dd24142
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 29 deletions.
16 changes: 11 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@ The minor version will be incremented upon a breaking change and the patch versi

## [Unreleased]

### Features
### Fixes

### Features

- client: add `GeyserGrpcClient::subscribe_once2` ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)).

### Breaking

## 2023-10-09

- yellowstone-grpc-kafka-1.0.0-rc.3+solana.1.16.15

### Features

- kafka: add metrics (stats, sent, recv) ([#196](https://github.com/rpcpool/yellowstone-grpc/pull/196)).
- kafka: support YAML config ([#197](https://github.com/rpcpool/yellowstone-grpc/pull/197)).

### Fixes
- kafka: support prometheus address in config ([#198](https://github.com/rpcpool/yellowstone-grpc/pull/198)).

## 2023-10-06

Expand All @@ -28,8 +36,6 @@ The minor version will be incremented upon a breaking change and the patch versi

- kafka: fix message size for gRPC client ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)).

### Breaking

## 2023-10-05

- yellowstone-grpc-client-1.11.0+solana.1.16.15
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion yellowstone-grpc-kafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-kafka"
version = "1.0.0-rc.2+solana.1.16.15"
version = "1.0.0-rc.3+solana.1.16.15"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Kafka Producer/Dedup/Consumer"
Expand Down
4 changes: 3 additions & 1 deletion yellowstone-grpc-kafka/config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"prometheus": "127.0.0.1:8873",
"kafka": {
"bootstrap.servers": "localhost:29092"
"bootstrap.servers": "localhost:29092",
"statistics.interval.ms": "1000"
},
"dedup": {
"kafka": {
Expand Down
6 changes: 4 additions & 2 deletions yellowstone-grpc-kafka/src/bin/grpc-kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct Args {
#[clap(short, long)]
config: String,

/// Prometheus listen address
/// [DEPRECATED: use config] Prometheus listen address
#[clap(long)]
prometheus: Option<SocketAddr>,

Expand Down Expand Up @@ -358,7 +358,9 @@ async fn main() -> anyhow::Result<()> {
let config = Config::load(&args.config).await?;

// Run prometheus server
prom::run_server(args.prometheus)?;
if let Some(address) = config.prometheus.or(args.prometheus) {
prom::run_server(address)?;
}

// Create kafka config
let mut kafka_config = ClientConfig::new();
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub trait GrpcRequestToProto<T> {
#[derive(Debug, Default, Deserialize)]
#[serde(default)]
pub struct Config {
pub prometheus: Option<SocketAddr>,
pub kafka: HashMap<String, String>,
pub dedup: Option<ConfigDedup>,
pub grpc2kafka: Option<ConfigGrpc2Kafka>,
Expand Down
36 changes: 17 additions & 19 deletions yellowstone-grpc-kafka/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ lazy_static::lazy_static! {
).unwrap();
}

pub fn run_server(address: Option<SocketAddr>) -> anyhow::Result<()> {
pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
static REGISTER: Once = Once::new();
REGISTER.call_once(|| {
macro_rules! register {
Expand Down Expand Up @@ -66,24 +66,22 @@ pub fn run_server(address: Option<SocketAddr>) -> anyhow::Result<()> {
.inc();
});

if let Some(address) = address {
let make_service = make_service_fn(move |_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}))
});
let server = Server::try_bind(&address)?.serve(make_service);
info!("prometheus server started: {address:?}");
tokio::spawn(async move {
if let Err(error) = server.await {
error!("prometheus server failed: {error:?}");
}
});
}
let make_service = make_service_fn(move |_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}))
});
let server = Server::try_bind(&address)?.serve(make_service);
info!("prometheus server started: {address:?}");
tokio::spawn(async move {
if let Err(error) = server.await {
error!("prometheus server failed: {error:?}");
}
});

Ok(())
}
Expand Down

0 comments on commit dd24142

Please sign in to comment.