diff --git a/CHANGELOG.md b/CHANGELOG.md index 5988441c..726e24a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,17 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking -- kafka: rename to tools ([#209](https://github.com/rpcpool/yellowstone-grpc/pull/209)). +## 2023-10-19 + +- yellowstone-grpc-tools-1.0.0-rc.5+solana.1.16.17 + +### Features + +- tools: add Google Pub/Sub ([#211](https://github.com/rpcpool/yellowstone-grpc/pull/211)). + +### Breaking + +- kafka: rename to tools ([#203](https://github.com/rpcpool/yellowstone-grpc/pull/203)). ## 2023-10-14 diff --git a/Cargo.lock b/Cargo.lock index f87e251c..25702968 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -327,6 +327,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.4.3" @@ -480,6 +491,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.4" @@ -833,6 +850,15 @@ dependencies = [ "unreachable", ] +[[package]] +name = "concurrent-queue" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -1216,6 +1242,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fastrand" version = "2.0.1" @@ -1250,6 +1282,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.0" @@ -1434,6 +1481,94 @@ dependencies = [ "scroll", ] +[[package]] +name = "google-cloud-auth" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1087f1fbd2dd3f58c17c7574ddd99cd61cbbbc2c4dc81114b8687209b196cb" +dependencies = [ + "async-trait", + "base64 0.21.4", + "google-cloud-metadata", + "google-cloud-token", + "home", + "jsonwebtoken", + "reqwest", + "serde", + "serde_json", + "thiserror", + "time", + "tokio", + "tracing", + "urlencoding", +] + +[[package]] +name = "google-cloud-gax" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7324ffbab9043aecca2d0e9b9be04944d31565e15c06297fb8ff151b01a7705d" +dependencies = [ + "google-cloud-token", + "http", + "thiserror", + "tokio", + "tokio-retry", + "tonic 0.9.2", + "tower", + "tracing", +] + +[[package]] +name = "google-cloud-googleapis" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5453af21ac0cc1f3b2cfb5b687c174e701c10ec2d5c286aff7ca8cbbf08d31b4" +dependencies = [ + "prost 0.11.9", + "prost-types 0.11.9", + "tonic 0.9.2", +] + +[[package]] +name = "google-cloud-metadata" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc279bfb50487d7bcd900e8688406475fc750fe474a835b2ab9ade9eb1fc90e2" +dependencies = [ + "reqwest", + "thiserror", + "tokio", +] + +[[package]] +name = "google-cloud-pubsub" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a501b632b606f8fdb36661d461cdd798db214117d1d433baa11f51dda4983870" +dependencies = [ + "async-channel", + "async-stream", + "google-cloud-auth", + "google-cloud-gax", + "google-cloud-googleapis", + "google-cloud-token", + "prost-types 0.11.9", + "thiserror", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "google-cloud-token" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fcd62eb34e3de2f085bcc33a09c3e17c4f65650f36d53eb328b00d63bcb536a" +dependencies = [ + "async-trait", +] + [[package]] name = "h2" version = "0.3.21" @@ -1648,6 +1783,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iana-time-zone" version = "0.1.57" @@ -1802,6 +1950,20 @@ dependencies = [ "serde", ] +[[package]] +name = "jsonwebtoken" +version = "8.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" +dependencies = [ + "base64 0.21.4", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "keccak" version = "0.1.4" @@ -1994,6 +2156,24 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2217,6 +2397,32 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" +dependencies = [ + "bitflags 2.4.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "openssl-probe" version = "0.1.5" @@ -2298,6 +2504,15 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "pem" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8835c273a76a90455d7344889b0964598e3316e2a79ede8e36f16bdcf2228b8" +dependencies = [ + "base64 0.13.1", +] + [[package]] name = "percent-encoding" version = "2.3.0" @@ -2489,6 +2704,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive 0.11.9", +] + [[package]] name = "prost" version = "0.12.1" @@ -2496,7 +2721,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.1", ] [[package]] @@ -2513,14 +2738,27 @@ dependencies = [ "once_cell", "petgraph", "prettyplease", - "prost", - "prost-types", + "prost 0.12.1", + "prost-types 0.12.1", "regex", "syn 2.0.37", "tempfile", "which", ] +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost-derive" version = "0.12.1" @@ -2534,13 +2772,22 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost 0.11.9", +] + [[package]] name = "prost-types" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" dependencies = [ - "prost", + "prost 0.12.1", ] [[package]] @@ -2778,10 +3025,12 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -2792,6 +3041,7 @@ dependencies = [ "serde_urlencoded", "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls", "tokio-util", "tower-service", @@ -2799,7 +3049,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 0.25.2", "winreg", ] @@ -2860,7 +3110,7 @@ checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.6", "sct", ] @@ -2885,6 +3135,16 @@ dependencies = [ "base64 0.21.4", ] +[[package]] +name = "rustls-webpki" +version = "0.100.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.101.6" @@ -3172,6 +3432,18 @@ version = "1.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +[[package]] +name = "simple_asn1" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085" +dependencies = [ + "num-bigint 0.4.4", + "num-traits", + "thiserror", + "time", +] + [[package]] name = "sized-chunks" version = "0.6.5" @@ -3964,6 +4236,7 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.5.4", @@ -3992,6 +4265,27 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -4070,6 +4364,39 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.4", + "bytes", + "flate2", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost 0.11.9", + "rustls-pemfile", + "tokio", + "tokio-rustls", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", + "webpki-roots 0.23.1", +] + [[package]] name = "tonic" version = "0.10.2" @@ -4089,7 +4416,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.12.1", "rustls", "rustls-native-certs", "rustls-pemfile", @@ -4122,10 +4449,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f80db390246dfb46553481f6024f0082ba00178ea495dbb99e70ba9a4fafb5e1" dependencies = [ "async-stream", - "prost", + "prost 0.12.1", "tokio", "tokio-stream", - "tonic", + "tonic 0.10.2", ] [[package]] @@ -4167,6 +4494,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -4313,6 +4641,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8parse" version = "0.2.1" @@ -4452,6 +4786,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338" +dependencies = [ + "rustls-webpki 0.100.3", +] + [[package]] name = "webpki-roots" version = "0.25.2" @@ -4604,7 +4947,7 @@ dependencies = [ "http", "thiserror", "tokio", - "tonic", + "tonic 0.10.2", "tonic-health", "yellowstone-grpc-proto", ] @@ -4658,7 +5001,7 @@ dependencies = [ "spl-token-2022", "tokio", "tokio-stream", - "tonic", + "tonic 0.10.2", "tonic-health", "vergen", "yellowstone-grpc-proto", @@ -4670,18 +5013,18 @@ version = "1.10.0+solana.1.16.17" dependencies = [ "anyhow", "bincode", - "prost", + "prost 0.12.1", "protobuf-src", "solana-account-decoder", "solana-sdk", "solana-transaction-status", - "tonic", + "tonic 0.10.2", "tonic-build", ] [[package]] name = "yellowstone-grpc-tools" -version = "1.0.0-rc.4+solana.1.16.17" +version = "1.0.0-rc.5+solana.1.16.17" dependencies = [ "anyhow", "async-trait", @@ -4691,6 +5034,8 @@ dependencies = [ "const-hex", "futures", "git-version", + "google-cloud-googleapis", + "google-cloud-pubsub", "hyper", "json5", "lazy_static", @@ -4702,7 +5047,7 @@ dependencies = [ "sha2 0.10.8", "tokio", "tokio-stream", - "tonic", + "tonic 0.10.2", "tonic-health", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 2bca263f..2e8010ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "yellowstone-grpc-client", # 1.11.1+solana.1.6.17 "yellowstone-grpc-geyser", # 1.10.0+solana.1.6.17 "yellowstone-grpc-proto", # 1.10.0+solana.1.16.17 - "yellowstone-grpc-tools", # 1.0.0-rc.4+solana.1.16.17 + "yellowstone-grpc-tools", # 1.0.0-rc.5+solana.1.16.17 ] [profile.release] diff --git a/README.md b/README.md index c8dd0094..2bbc5ffe 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,51 @@ It's possible to add limits for filters in config. If `filters` field is omitted ### gRPC Tools +#### Google Pub/Sub + +```bash +$ cargo run --bin grpc-google-pubsub -- --help +Yellowstone gRPC Google Pub/Sub Tool + +Usage: grpc-google-pubsub [OPTIONS] --config + +Commands: + grpc2pubsub Receive data from gRPC and send them to the Pub/Sub + pubsub2stdout Dev: subscribe to message from Pub/Sub and print them to Stdout + pubsubTopicCreate Dev: create Pub/Sub topic + pubsubTopicDelete Dev: delete Pub/Sub topic + help Print this message or the help of the given subcommand(s) + +Options: + -c, --config Path to config file + --prometheus Prometheus listen address + -h, --help Print help + -V, --version Print version +``` + +##### Development + +```bash +# export creds +export GOOGLE_APPLICATION_CREDENTIALS=/path/to/google/project/creds.json +# send messages from gRPC to Google Pub/Sub +cargo run --bin grpc-google-pubsub -- --config yellowstone-grpc-tools/config-google-pubsub.json grpc2pubsub +``` + +with emulator: + +```bash +# retrive `USER_CONFIG_DIR` +$ gcloud info --format='get(config.paths.global_config_dir)' +# run emulator, data dir by default: `/emulators/pubsub` +$ gcloud beta emulators pubsub start +... +# send serialized gRPC messages to Google Pub/Sub with PUBSUB_EMULATOR_HOST +$ PUBSUB_EMULATOR_HOST=localhost:8085 cargo run --bin grpc-google-pubsub -- --config yellowstone-grpc-tools/config-google-pubsub.json grpc2pubsub +# print type of messages from Google Pub/Sub with PUBSUB_EMULATOR_HOST +$ PUBSUB_EMULATOR_HOST=localhost:8085 cargo run --bin grpc-google-pubsub -- --config yellowstone-grpc-tools/config-google-pubsub.json --prometheus 1 pubsub2stdout +``` + #### Kafka In addition to gRPC Geyser Plugin we provide Kafka tool. This tool can works in 3 modes: @@ -128,11 +173,12 @@ Commands: Options: -c, --config Path to config file + --prometheus Prometheus listen address -h, --help Print help -V, --version Print version ``` -#### Development +##### Development ```bash # run kafka locally diff --git a/yellowstone-grpc-tools/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml index 5fee045f..8c6bcb98 100644 --- a/yellowstone-grpc-tools/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-tools" -version = "1.0.0-rc.4+solana.1.16.17" +version = "1.0.0-rc.5+solana.1.16.17" authors = ["Triton One"] edition = "2021" description = "Yellowstone gRPC Tools" @@ -13,6 +13,8 @@ atty = "0.2.14" clap = { version = "4.3.0", features = ["cargo", "derive"] } const-hex = "1.6.2" futures = "0.3.24" +google-cloud-googleapis = "0.11.0" +google-cloud-pubsub = "0.21.0" hyper = { version = "0.14.27", features = ["server"] } json5 = "0.4.1" lazy_static = "1.4.0" diff --git a/yellowstone-grpc-tools/config-google-pubsub.json b/yellowstone-grpc-tools/config-google-pubsub.json new file mode 100644 index 00000000..8ffc9b25 --- /dev/null +++ b/yellowstone-grpc-tools/config-google-pubsub.json @@ -0,0 +1,25 @@ +{ + "prometheus": "127.0.0.1:8873", + "grpc2pubsub": { + "endpoint": "http://127.0.0.1:10000", + "x_token": null, + "request": { + "slots": ["client"], + "blocks": { + "client": { + "account_include": [], + "include_transactions": false, + "include_accounts": false, + "include_entries": false + } + } + }, + "topic": "grpc", + "create_if_not_exists": true, + "workers": 3, + "flush_interval_ms": 100, + "bundle_size": 3, + "bulk_max_size": 10, + "bulk_max_wait_ms": 100 + } +} diff --git a/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs b/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs new file mode 100644 index 00000000..71df3b6a --- /dev/null +++ b/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs @@ -0,0 +1,281 @@ +use { + clap::{Parser, Subcommand}, + futures::{future::BoxFuture, stream::StreamExt}, + google_cloud_googleapis::pubsub::v1::PubsubMessage, + google_cloud_pubsub::{client::Client, subscription::SubscriptionConfig}, + std::{net::SocketAddr, time::Duration}, + tokio::{task::JoinSet, time::sleep}, + tracing::{info, warn}, + tracing_subscriber::{ + filter::{EnvFilter, LevelFilter}, + layer::SubscriberExt, + util::SubscriberInitExt, + }, + yellowstone_grpc_client::GeyserGrpcClient, + yellowstone_grpc_proto::{ + prelude::{subscribe_update::UpdateOneof, SubscribeUpdate}, + prost::Message as _, + }, + yellowstone_grpc_tools::{ + config::{load as config_load, GrpcRequestToProto}, + create_shutdown, + google_pubsub::{ + config::{Config, ConfigGrpc2PubSub}, + prom, + }, + prom::{run_server as prometheus_run_server, GprcMessageKind}, + }, +}; + +#[derive(Debug, Clone, Parser)] +#[clap(author, version, about = "Yellowstone gRPC Google Pub/Sub Tool")] +struct Args { + /// Path to config file + #[clap(short, long)] + config: String, + + /// Prometheus listen address + #[clap(long)] + prometheus: Option, + + #[command(subcommand)] + action: ArgsAction, +} + +#[derive(Debug, Clone, Subcommand)] +enum ArgsAction { + /// Receive data from gRPC and send them to the Pub/Sub + #[command(name = "grpc2pubsub")] + Grpc2PubSub, + /// Dev: subscribe to message from Pub/Sub and print them to Stdout + #[command(name = "pubsub2stdout")] + PubSub2Stdout { + #[clap(long)] + topic: String, + #[clap(long)] + subscription: String, + }, + /// Dev: create Pub/Sub topic + #[command(name = "pubsubTopicCreate")] + PubSubTopicCreate { topic: String }, + /// Dev: delete Pub/Sub topic + #[command(name = "pubsubTopicDelete")] + PubSubTopicDelete { topic: String }, +} + +impl ArgsAction { + async fn run(self, config: Config) -> anyhow::Result<()> { + let shutdown = create_shutdown()?; + let client = config.create_client().await?; + + match self { + ArgsAction::Grpc2PubSub => { + let config = config.grpc2pubsub.ok_or_else(|| { + anyhow::anyhow!("`grpc2pubsub` section in config should be defined") + })?; + Self::grpc2pubsub(client, config, shutdown).await + } + ArgsAction::PubSub2Stdout { + topic, + subscription, + } => Self::pubsub2stdout(client, topic, subscription, shutdown).await, + ArgsAction::PubSubTopicCreate { topic } => { + let topic = client.topic(&topic); + if !topic.exists(None).await? { + topic.create(None, None).await?; + } + Ok(()) + } + ArgsAction::PubSubTopicDelete { topic } => { + client.topic(&topic).delete(None).await.map_err(Into::into) + } + } + } + + async fn grpc2pubsub( + client: Client, + config: ConfigGrpc2PubSub, + mut shutdown: BoxFuture<'static, ()>, + ) -> anyhow::Result<()> { + // Connect to Pub/Sub and create topic if not exists + let topic = client.topic(&config.topic); + if !topic.exists(None).await? { + anyhow::ensure!( + config.create_if_not_exists, + "topic {} doesn't exists", + config.topic + ); + topic.create(None, None).await?; + } + let publisher = topic.new_publisher(Some(config.get_publisher_config())); + + // Create gRPC client & subscribe + let mut client = GeyserGrpcClient::connect_with_timeout( + config.endpoint, + config.x_token, + None, + Some(Duration::from_secs(10)), + Some(Duration::from_secs(5)), + false, + ) + .await?; + let mut geyser = client.subscribe_once2(config.request.to_proto()).await?; + + // Receive-send loop + let mut send_tasks = JoinSet::new(); + let mut msg_slot = 0; + let mut msg_id = 0; + 'outer: loop { + let sleep = sleep(Duration::from_millis(config.bulk_max_wait_ms as u64)); + tokio::pin!(sleep); + let mut messages = vec![]; + let mut prom_kind = vec![]; + while messages.len() < config.bulk_max_size { + let message = tokio::select! { + _ = &mut shutdown => break 'outer, + _ = &mut sleep => break, + maybe_result = send_tasks.join_next() => match maybe_result { + Some(result) => { + result??; + continue; + } + None => tokio::select! { + _ = &mut shutdown => break 'outer, + _ = &mut sleep => break, + message = geyser.next() => message, + } + }, + message = geyser.next() => message, + } + .transpose()?; + let message = match message { + Some(message) => message, + None => break 'outer, + }; + + let payload = message.encode_to_vec(); + let message = match &message.update_oneof { + Some(value) => value, + None => unreachable!("Expect valid message"), + }; + let slot = match message { + UpdateOneof::Account(msg) => msg.slot, + UpdateOneof::Slot(msg) => msg.slot, + UpdateOneof::Transaction(msg) => msg.slot, + UpdateOneof::Block(msg) => msg.slot, + UpdateOneof::Ping(_) => continue, + UpdateOneof::BlockMeta(msg) => msg.slot, + UpdateOneof::Entry(msg) => msg.slot, + }; + if msg_slot != slot { + msg_slot = slot; + msg_id = 0; + } + msg_id += 1; + + messages.push(PubsubMessage { + data: payload, + ordering_key: format!("{msg_slot}-{msg_id}"), + ..Default::default() + }); + prom_kind.push(GprcMessageKind::from(message)); + } + if messages.is_empty() { + continue; + } + + for (awaiter, prom_kind) in publisher + .publish_bulk(messages) + .await + .into_iter() + .zip(prom_kind.into_iter()) + { + send_tasks.spawn(async move { + awaiter.get().await?; + prom::sent_inc(prom_kind); + Ok::<(), anyhow::Error>(()) + }); + } + } + warn!("shutdown received..."); + while let Some(result) = send_tasks.join_next().await { + result??; + } + Ok(()) + } + + async fn pubsub2stdout( + client: Client, + topic: String, + subscription: String, + mut shutdown: BoxFuture<'static, ()>, + ) -> anyhow::Result<()> { + let topic = client.topic(&topic); + if !topic.exists(None).await? { + topic.create(None, None).await?; + } + let subscription = client.subscription(&subscription); + if !subscription.exists(None).await? { + let fqtn = topic.fully_qualified_name(); + let config = SubscriptionConfig::default(); + subscription.create(fqtn, config, None).await?; + } + + let mut stream = subscription.subscribe(None).await?; + loop { + let msg = tokio::select! { + _ = &mut shutdown => break, + msg = stream.next() => match msg { + Some(msg) => msg, + None => break, + } + }; + + msg.ack().await?; + match SubscribeUpdate::decode(msg.message.data.as_ref()) { + Ok(msg) => match msg.update_oneof { + Some(UpdateOneof::Account(msg)) => info!("#{}, account", msg.slot), + Some(UpdateOneof::Slot(msg)) => info!("#{}, slot", msg.slot), + Some(UpdateOneof::Transaction(msg)) => { + info!("#{}, transaction", msg.slot) + } + Some(UpdateOneof::Block(msg)) => info!("#{}, block", msg.slot), + Some(UpdateOneof::Ping(_)) => {} + Some(UpdateOneof::BlockMeta(msg)) => info!("#{}, blockmeta", msg.slot), + Some(UpdateOneof::Entry(msg)) => info!("#{}, entry", msg.slot), + None => {} + }, + Err(error) => { + warn!("failed to decode message: {error}"); + } + } + } + + Ok(()) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Setup tracing + let is_atty = atty::is(atty::Stream::Stdout) && atty::is(atty::Stream::Stderr); + let io_layer = tracing_subscriber::fmt::layer().with_ansi(is_atty); + let level_layer = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + tracing_subscriber::registry() + .with(io_layer) + .with(level_layer) + .try_init()?; + + // Parse args + let args = Args::parse(); + let config = config_load::(&args.config).await?; + + // Run prometheus server + if let Some(address) = args.prometheus.or(config.prometheus) { + prometheus_run_server(address)?; + } + + args.action.run(config).await +} diff --git a/yellowstone-grpc-tools/src/bin/grpc-kafka.rs b/yellowstone-grpc-tools/src/bin/grpc-kafka.rs index 1a8e4af2..34f7ef9a 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-kafka.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-kafka.rs @@ -1,17 +1,11 @@ use { anyhow::Context, clap::{Parser, Subcommand}, - futures::{ - future::{BoxFuture, FutureExt}, - stream::StreamExt, - }, + futures::{future::BoxFuture, stream::StreamExt}, rdkafka::{config::ClientConfig, consumer::Consumer, message::Message, producer::FutureRecord}, sha2::{Digest, Sha256}, - std::{sync::Arc, time::Duration}, - tokio::{ - signal::unix::{signal, SignalKind}, - task::JoinSet, - }, + std::{net::SocketAddr, sync::Arc, time::Duration}, + tokio::task::JoinSet, tracing::{debug, trace, warn}, tracing_subscriber::{ filter::{EnvFilter, LevelFilter}, @@ -24,13 +18,15 @@ use { prost::Message as _, }, yellowstone_grpc_tools::{ + config::{load as config_load, GrpcRequestToProto}, + create_shutdown, kafka::{ - config::{Config, ConfigDedup, ConfigGrpc2Kafka, ConfigKafka2Grpc, GrpcRequestToProto}, + config::{Config, ConfigDedup, ConfigGrpc2Kafka, ConfigKafka2Grpc}, dedup::KafkaDedup, grpc::GrpcService, prom, }, - prom::run_server as prometheus_run_server, + prom::{run_server as prometheus_run_server, GprcMessageKind}, }, }; @@ -41,6 +37,10 @@ struct Args { #[clap(short, long)] config: String, + /// Prometheus listen address + #[clap(long)] + prometheus: Option, + #[command(subcommand)] action: ArgsAction, } @@ -58,12 +58,8 @@ enum ArgsAction { } impl ArgsAction { - async fn run( - self, - config: Config, - kafka_config: ClientConfig, - shutdown: BoxFuture<'static, ()>, - ) -> anyhow::Result<()> { + async fn run(self, config: Config, kafka_config: ClientConfig) -> anyhow::Result<()> { + let shutdown = create_shutdown()?; match self { ArgsAction::Dedup => { let config = config.dedup.ok_or_else(|| { @@ -166,7 +162,7 @@ impl ArgsAction { debug!("kafka send message with key: {key}, result: {result:?}"); result?.map_err(|(error, _message)| error)?; - prom::sent_inc(prom::GprcMessageKind::Unknown); + prom::sent_inc(GprcMessageKind::Unknown); Ok::<(), anyhow::Error>(()) } Err(error) => Err(error.0.into()), @@ -256,7 +252,7 @@ impl ArgsAction { }; let hash = Sha256::digest(&payload); let key = format!("{slot}_{}", const_hex::encode(hash)); - let prom_kind = prom::GprcMessageKind::from(message); + let prom_kind = GprcMessageKind::from(message); let record = FutureRecord::to(&config.kafka_topic) .key(&key) @@ -354,10 +350,10 @@ async fn main() -> anyhow::Result<()> { // Parse args let args = Args::parse(); - let config = Config::load(&args.config).await?; + let config = config_load::(&args.config).await?; // Run prometheus server - if let Some(address) = config.prometheus { + if let Some(address) = args.prometheus.or(config.prometheus) { prometheus_run_server(address)?; } @@ -367,16 +363,5 @@ async fn main() -> anyhow::Result<()> { kafka_config.set(key, value); } - // Create shutdown signal - let mut sigint = signal(SignalKind::interrupt())?; - let mut sigterm = signal(SignalKind::terminate())?; - let shutdown = async move { - tokio::select! { - _ = sigint.recv() => {}, - _ = sigterm.recv() => {} - }; - } - .boxed(); - - args.action.run(config, kafka_config, shutdown).await + args.action.run(config, kafka_config).await } diff --git a/yellowstone-grpc-tools/src/config.rs b/yellowstone-grpc-tools/src/config.rs new file mode 100644 index 00000000..2edd40f5 --- /dev/null +++ b/yellowstone-grpc-tools/src/config.rs @@ -0,0 +1,261 @@ +use { + anyhow::Context, + serde::{de, Deserialize, Serialize}, + std::{ + collections::{HashMap, HashSet}, + path::Path, + }, + tokio::fs, + yellowstone_grpc_proto::prelude::{ + subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + CommitmentLevel, SubscribeRequest, SubscribeRequestAccountsDataSlice, + SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, + SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterBlocks, + SubscribeRequestFilterTransactions, + }, +}; + +pub async fn load(path: impl AsRef + Copy) -> anyhow::Result +where + T: de::DeserializeOwned, +{ + let text = fs::read_to_string(path) + .await + .context("failed to read config from file")?; + + match path.as_ref().extension().and_then(|e| e.to_str()) { + Some("yaml") | Some("yml") => { + serde_yaml::from_str(&text).context("failed to parse config from file") + } + Some("json") => serde_yaml::from_str(&text).context("failed to parse config from file"), + value => anyhow::bail!("unknown config extension: {value:?}"), + } +} + +pub trait GrpcRequestToProto { + fn to_proto(self) -> T; +} + +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(default)] +pub struct ConfigGrpcRequest { + pub slots: HashSet, + pub accounts: HashMap, + pub transactions: HashMap, + pub entries: HashSet, + pub blocks: HashMap, + pub blocks_meta: HashSet, + pub commitment: Option, + pub accounts_data_slice: Vec, +} + +impl ConfigGrpcRequest { + fn map_to_proto(map: HashMap>) -> HashMap { + map.into_iter().map(|(k, v)| (k, v.to_proto())).collect() + } + + fn set_to_proto(set: HashSet) -> HashMap { + set.into_iter().map(|v| (v, T::default())).collect() + } + + fn vec_to_proto(vec: Vec>) -> Vec { + vec.into_iter().map(|v| v.to_proto()).collect() + } +} + +impl GrpcRequestToProto for ConfigGrpcRequest { + fn to_proto(self) -> SubscribeRequest { + SubscribeRequest { + slots: ConfigGrpcRequest::set_to_proto(self.slots), + accounts: ConfigGrpcRequest::map_to_proto(self.accounts), + transactions: ConfigGrpcRequest::map_to_proto(self.transactions), + entry: ConfigGrpcRequest::set_to_proto(self.entries), + blocks: ConfigGrpcRequest::map_to_proto(self.blocks), + blocks_meta: ConfigGrpcRequest::set_to_proto(self.blocks_meta), + commitment: self.commitment.map(|v| v.to_proto() as i32), + accounts_data_slice: ConfigGrpcRequest::vec_to_proto(self.accounts_data_slice), + } + } +} + +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(default)] +pub struct ConfigGrpcRequestAccounts { + account: Vec, + owner: Vec, + filters: Vec, +} + +impl GrpcRequestToProto for ConfigGrpcRequestAccounts { + fn to_proto(self) -> SubscribeRequestFilterAccounts { + SubscribeRequestFilterAccounts { + account: self.account, + owner: self.owner, + filters: self.filters.into_iter().map(|f| f.to_proto()).collect(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub enum ConfigGrpcRequestAccountsFilter { + Memcmp { offset: u64, base58: String }, + DataSize(u64), + TokenAccountState, +} + +impl GrpcRequestToProto for ConfigGrpcRequestAccountsFilter { + fn to_proto(self) -> SubscribeRequestFilterAccountsFilter { + SubscribeRequestFilterAccountsFilter { + filter: Some(match self { + ConfigGrpcRequestAccountsFilter::Memcmp { offset, base58 } => { + AccountsFilterDataOneof::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp { + offset, + data: Some(AccountsFilterMemcmpOneof::Base58(base58)), + }) + } + ConfigGrpcRequestAccountsFilter::DataSize(size) => { + AccountsFilterDataOneof::Datasize(size) + } + ConfigGrpcRequestAccountsFilter::TokenAccountState => { + AccountsFilterDataOneof::TokenAccountState(true) + } + }), + } + } +} + +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(default)] +pub struct ConfigGrpcRequestTransactions { + pub vote: Option, + pub failed: Option, + pub signature: Option, + pub account_include: Vec, + pub account_exclude: Vec, + pub account_required: Vec, +} + +impl GrpcRequestToProto for ConfigGrpcRequestTransactions { + fn to_proto(self) -> SubscribeRequestFilterTransactions { + SubscribeRequestFilterTransactions { + vote: self.vote, + failed: self.failed, + signature: self.signature, + account_include: self.account_include, + account_exclude: self.account_exclude, + account_required: self.account_required, + } + } +} + +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(default)] +pub struct ConfigGrpcRequestBlocks { + pub account_include: Vec, + pub include_transactions: Option, + pub include_accounts: Option, + pub include_entries: Option, +} + +impl GrpcRequestToProto for ConfigGrpcRequestBlocks { + fn to_proto(self) -> SubscribeRequestFilterBlocks { + SubscribeRequestFilterBlocks { + account_include: self.account_include, + include_transactions: self.include_transactions, + include_accounts: self.include_accounts, + include_entries: self.include_entries, + } + } +} + +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum ConfigGrpcRequestCommitment { + Processed, + Confirmed, + Finalized, +} + +impl GrpcRequestToProto for ConfigGrpcRequestCommitment { + fn to_proto(self) -> CommitmentLevel { + match self { + Self::Processed => CommitmentLevel::Processed, + Self::Confirmed => CommitmentLevel::Confirmed, + Self::Finalized => CommitmentLevel::Finalized, + } + } +} + +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +pub struct ConfigGrpcRequestAccountsDataSlice { + pub offset: u64, + pub length: u64, +} + +impl GrpcRequestToProto for ConfigGrpcRequestAccountsDataSlice { + fn to_proto(self) -> SubscribeRequestAccountsDataSlice { + SubscribeRequestAccountsDataSlice { + offset: self.offset, + length: self.length, + } + } +} + +pub fn deserialize_usize_str<'de, D>(deserializer: D) -> Result +where + D: de::Deserializer<'de>, +{ + #[derive(Deserialize)] + #[serde(untagged)] + enum Value { + Integer(usize), + String(String), + } + + match Value::deserialize(deserializer)? { + Value::Integer(value) => Ok(value), + Value::String(value) => value + .replace('_', "") + .parse::() + .map_err(de::Error::custom), + } +} + +#[cfg(test)] +mod tests { + use super::ConfigGrpcRequestAccountsFilter; + + #[test] + fn grpc_config_accounts_filter_memcmp() { + let filter = ConfigGrpcRequestAccountsFilter::Memcmp { + offset: 42, + base58: "123".to_owned(), + }; + let text = serde_json::to_string(&filter).unwrap(); + assert_eq!( + serde_json::from_str::(&text).unwrap(), + filter + ); + } + + #[test] + fn grpc_config_accounts_filter_datasize() { + let filter = ConfigGrpcRequestAccountsFilter::DataSize(42); + let text = serde_json::to_string(&filter).unwrap(); + assert_eq!( + serde_json::from_str::(&text).unwrap(), + filter + ); + } + + #[test] + fn grpc_config_accounts_filter_token() { + let filter = ConfigGrpcRequestAccountsFilter::TokenAccountState; + let text = serde_json::to_string(&filter).unwrap(); + assert_eq!( + serde_json::from_str::(&text).unwrap(), + filter + ); + } +} diff --git a/yellowstone-grpc-tools/src/google_pubsub/config.rs b/yellowstone-grpc-tools/src/google_pubsub/config.rs new file mode 100644 index 00000000..5b85a2b8 --- /dev/null +++ b/yellowstone-grpc-tools/src/google_pubsub/config.rs @@ -0,0 +1,104 @@ +use { + crate::config::{deserialize_usize_str, ConfigGrpcRequest}, + google_cloud_pubsub::{ + client::{google_cloud_auth::credentials::CredentialsFile, Client, ClientConfig}, + publisher::PublisherConfig, + }, + serde::Deserialize, + std::{net::SocketAddr, time::Duration}, +}; + +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +pub struct Config { + pub prometheus: Option, + pub auth: Option, + pub grpc2pubsub: Option, +} + +impl Config { + pub async fn create_client(&self) -> anyhow::Result { + let mut config = ClientConfig::default(); + if let Some(creds) = match self.auth.clone() { + Some(filepath) => CredentialsFile::new_from_file(filepath).await.map(Some), + None => { + if std::env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_ok() + || std::env::var("GOOGLE_APPLICATION_CREDENTIALS").is_ok() + { + CredentialsFile::new().await.map(Some) + } else { + Ok(None) + } + } + }? { + config = config.with_credentials(creds).await?; + } + Client::new(config).await.map_err(Into::into) + } +} + +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +pub struct ConfigGrpc2PubSub { + pub endpoint: String, + pub x_token: Option, + pub request: ConfigGrpcRequest, + + pub topic: String, + // Create `topic` with default config if not exists + pub create_if_not_exists: bool, + + // Publisher config + #[serde(default = "ConfigGrpc2PubSub::default_workers")] + pub workers: usize, + #[serde( + default = "ConfigGrpc2PubSub::default_flush_interval_ms", + deserialize_with = "deserialize_usize_str" + )] + pub flush_interval_ms: usize, + #[serde(default = "ConfigGrpc2PubSub::default_bundle_size")] + pub bundle_size: usize, + + // Publisher bulk config + #[serde( + default = "ConfigGrpc2PubSub::default_bulk_max_size", + deserialize_with = "deserialize_usize_str" + )] + pub bulk_max_size: usize, + #[serde( + default = "ConfigGrpc2PubSub::default_bulk_max_wait_ms", + deserialize_with = "deserialize_usize_str" + )] + pub bulk_max_wait_ms: usize, +} + +impl ConfigGrpc2PubSub { + fn default_workers() -> usize { + PublisherConfig::default().workers + } + + fn default_flush_interval_ms() -> usize { + PublisherConfig::default().flush_interval.as_millis() as usize + } + + fn default_bundle_size() -> usize { + PublisherConfig::default().bundle_size + } + + const fn default_bulk_max_size() -> usize { + 10 + } + + const fn default_bulk_max_wait_ms() -> usize { + 100 + } + + pub const fn get_publisher_config(&self) -> PublisherConfig { + PublisherConfig { + workers: self.workers, + flush_interval: Duration::from_millis(self.flush_interval_ms as u64), + bundle_size: self.bundle_size, + retry_setting: None, + } + } +} diff --git a/yellowstone-grpc-tools/src/google_pubsub/mod.rs b/yellowstone-grpc-tools/src/google_pubsub/mod.rs new file mode 100644 index 00000000..b43365b5 --- /dev/null +++ b/yellowstone-grpc-tools/src/google_pubsub/mod.rs @@ -0,0 +1,2 @@ +pub mod config; +pub mod prom; diff --git a/yellowstone-grpc-tools/src/google_pubsub/prom.rs b/yellowstone-grpc-tools/src/google_pubsub/prom.rs new file mode 100644 index 00000000..00eba5ad --- /dev/null +++ b/yellowstone-grpc-tools/src/google_pubsub/prom.rs @@ -0,0 +1,17 @@ +use { + crate::prom::GprcMessageKind, + prometheus::{IntCounterVec, Opts}, +}; + +lazy_static::lazy_static! { + pub(crate) static ref GOOGLE_PUBSUB_SENT_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("google_pubsub_sent_total", "Total number of uploaded messages by type"), + &["kind"] + ).unwrap(); +} + +pub fn sent_inc(kind: GprcMessageKind) { + GOOGLE_PUBSUB_SENT_TOTAL + .with_label_values(&[kind.as_str()]) + .inc() +} diff --git a/yellowstone-grpc-tools/src/kafka/config.rs b/yellowstone-grpc-tools/src/kafka/config.rs index f01d754d..6cda8aba 100644 --- a/yellowstone-grpc-tools/src/kafka/config.rs +++ b/yellowstone-grpc-tools/src/kafka/config.rs @@ -1,30 +1,10 @@ use { super::dedup::{KafkaDedup, KafkaDedupMemory}, - anyhow::Context, - serde::{ - de::{self, Deserializer}, - Deserialize, Serialize, - }, - std::{ - collections::{HashMap, HashSet}, - net::SocketAddr, - path::Path, - }, - tokio::fs, - yellowstone_grpc_proto::prelude::{ - subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, - subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - CommitmentLevel, SubscribeRequest, SubscribeRequestAccountsDataSlice, - SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, - SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterBlocks, - SubscribeRequestFilterTransactions, - }, + crate::config::{deserialize_usize_str, ConfigGrpcRequest}, + serde::Deserialize, + std::{collections::HashMap, net::SocketAddr}, }; -pub trait GrpcRequestToProto { - fn to_proto(self) -> T; -} - #[derive(Debug, Default, Deserialize)] #[serde(default)] pub struct Config { @@ -35,22 +15,6 @@ pub struct Config { pub kafka2grpc: Option, } -impl Config { - pub async fn load(path: impl AsRef + Copy) -> anyhow::Result { - let text = fs::read_to_string(path) - .await - .context("failed to read config from file")?; - - match path.as_ref().extension().and_then(|e| e.to_str()) { - Some("yaml") | Some("yml") => { - serde_yaml::from_str(&text).context("failed to parse config from file") - } - Some("json") => serde_yaml::from_str(&text).context("failed to parse config from file"), - value => anyhow::bail!("unknown config extension: {value:?}"), - } - } -} - #[derive(Debug, Deserialize)] pub struct ConfigDedup { #[serde(default)] @@ -59,7 +23,7 @@ pub struct ConfigDedup { pub kafka_output: String, #[serde( default = "ConfigGrpc2Kafka::default_kafka_queue_size", - deserialize_with = "ConfigGrpc2Kafka::deserialize_usize_str" + deserialize_with = "deserialize_usize_str" )] pub kafka_queue_size: usize, pub backend: ConfigDedupBackend, @@ -83,13 +47,13 @@ impl ConfigDedupBackend { pub struct ConfigGrpc2Kafka { pub endpoint: String, pub x_token: Option, - pub request: ConfigGrpc2KafkaRequest, + pub request: ConfigGrpcRequest, #[serde(default)] pub kafka: HashMap, pub kafka_topic: String, #[serde( default = "ConfigGrpc2Kafka::default_kafka_queue_size", - deserialize_with = "ConfigGrpc2Kafka::deserialize_usize_str" + deserialize_with = "deserialize_usize_str" )] pub kafka_queue_size: usize, } @@ -98,197 +62,6 @@ impl ConfigGrpc2Kafka { const fn default_kafka_queue_size() -> usize { 10_000 } - - fn deserialize_usize_str<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - #[serde(untagged)] - enum Value { - Integer(usize), - String(String), - } - - match Value::deserialize(deserializer)? { - Value::Integer(value) => Ok(value), - Value::String(value) => value - .replace('_', "") - .parse::() - .map_err(de::Error::custom), - } - } -} - -#[derive(Debug, Default, Deserialize, Serialize)] -#[serde(default)] -pub struct ConfigGrpc2KafkaRequest { - pub slots: HashSet, - pub accounts: HashMap, - pub transactions: HashMap, - pub entries: HashSet, - pub blocks: HashMap, - pub blocks_meta: HashSet, - pub commitment: Option, - pub accounts_data_slice: Vec, -} - -impl ConfigGrpc2KafkaRequest { - fn map_to_proto(map: HashMap>) -> HashMap { - map.into_iter().map(|(k, v)| (k, v.to_proto())).collect() - } - - fn set_to_proto(set: HashSet) -> HashMap { - set.into_iter().map(|v| (v, T::default())).collect() - } - - fn vec_to_proto(vec: Vec>) -> Vec { - vec.into_iter().map(|v| v.to_proto()).collect() - } -} - -impl GrpcRequestToProto for ConfigGrpc2KafkaRequest { - fn to_proto(self) -> SubscribeRequest { - SubscribeRequest { - slots: ConfigGrpc2KafkaRequest::set_to_proto(self.slots), - accounts: ConfigGrpc2KafkaRequest::map_to_proto(self.accounts), - transactions: ConfigGrpc2KafkaRequest::map_to_proto(self.transactions), - entry: ConfigGrpc2KafkaRequest::set_to_proto(self.entries), - blocks: ConfigGrpc2KafkaRequest::map_to_proto(self.blocks), - blocks_meta: ConfigGrpc2KafkaRequest::set_to_proto(self.blocks_meta), - commitment: self.commitment.map(|v| v.to_proto() as i32), - accounts_data_slice: ConfigGrpc2KafkaRequest::vec_to_proto(self.accounts_data_slice), - } - } -} - -#[derive(Debug, Default, Deserialize, Serialize)] -#[serde(default)] -pub struct ConfigGrpc2KafkaRequestAccounts { - account: Vec, - owner: Vec, - filters: Vec, -} - -impl GrpcRequestToProto for ConfigGrpc2KafkaRequestAccounts { - fn to_proto(self) -> SubscribeRequestFilterAccounts { - SubscribeRequestFilterAccounts { - account: self.account, - owner: self.owner, - filters: self.filters.into_iter().map(|f| f.to_proto()).collect(), - } - } -} - -#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] -pub enum ConfigGrpc2KafkaRequestAccountsFilter { - Memcmp { offset: u64, base58: String }, - DataSize(u64), - TokenAccountState, -} - -impl GrpcRequestToProto - for ConfigGrpc2KafkaRequestAccountsFilter -{ - fn to_proto(self) -> SubscribeRequestFilterAccountsFilter { - SubscribeRequestFilterAccountsFilter { - filter: Some(match self { - ConfigGrpc2KafkaRequestAccountsFilter::Memcmp { offset, base58 } => { - AccountsFilterDataOneof::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp { - offset, - data: Some(AccountsFilterMemcmpOneof::Base58(base58)), - }) - } - ConfigGrpc2KafkaRequestAccountsFilter::DataSize(size) => { - AccountsFilterDataOneof::Datasize(size) - } - ConfigGrpc2KafkaRequestAccountsFilter::TokenAccountState => { - AccountsFilterDataOneof::TokenAccountState(true) - } - }), - } - } -} - -#[derive(Debug, Default, Deserialize, Serialize)] -#[serde(default)] -pub struct ConfigGrpc2KafkaRequestTransactions { - pub vote: Option, - pub failed: Option, - pub signature: Option, - pub account_include: Vec, - pub account_exclude: Vec, - pub account_required: Vec, -} - -impl GrpcRequestToProto - for ConfigGrpc2KafkaRequestTransactions -{ - fn to_proto(self) -> SubscribeRequestFilterTransactions { - SubscribeRequestFilterTransactions { - vote: self.vote, - failed: self.failed, - signature: self.signature, - account_include: self.account_include, - account_exclude: self.account_exclude, - account_required: self.account_required, - } - } -} - -#[derive(Debug, Default, Deserialize, Serialize)] -#[serde(default)] -pub struct ConfigGrpc2KafkaRequestBlocks { - pub account_include: Vec, - pub include_transactions: Option, - pub include_accounts: Option, - pub include_entries: Option, -} - -impl GrpcRequestToProto for ConfigGrpc2KafkaRequestBlocks { - fn to_proto(self) -> SubscribeRequestFilterBlocks { - SubscribeRequestFilterBlocks { - account_include: self.account_include, - include_transactions: self.include_transactions, - include_accounts: self.include_accounts, - include_entries: self.include_entries, - } - } -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "lowercase")] -pub enum ConfigGrpc2KafkaRequestCommitment { - Processed, - Confirmed, - Finalized, -} - -impl GrpcRequestToProto for ConfigGrpc2KafkaRequestCommitment { - fn to_proto(self) -> CommitmentLevel { - match self { - Self::Processed => CommitmentLevel::Processed, - Self::Confirmed => CommitmentLevel::Confirmed, - Self::Finalized => CommitmentLevel::Finalized, - } - } -} - -#[derive(Debug, Deserialize, Serialize)] -pub struct ConfigGrpc2KafkaRequestAccountsDataSlice { - pub offset: u64, - pub length: u64, -} - -impl GrpcRequestToProto - for ConfigGrpc2KafkaRequestAccountsDataSlice -{ - fn to_proto(self) -> SubscribeRequestAccountsDataSlice { - SubscribeRequestAccountsDataSlice { - offset: self.offset, - length: self.length, - } - } } #[derive(Debug, Deserialize)] @@ -306,41 +79,3 @@ impl ConfigKafka2Grpc { 250_000 } } - -#[cfg(test)] -mod tests { - use super::ConfigGrpc2KafkaRequestAccountsFilter; - - #[test] - fn grpc_config_accounts_filter_memcmp() { - let filter = ConfigGrpc2KafkaRequestAccountsFilter::Memcmp { - offset: 42, - base58: "123".to_owned(), - }; - let text = serde_json::to_string(&filter).unwrap(); - assert_eq!( - serde_json::from_str::(&text).unwrap(), - filter - ); - } - - #[test] - fn grpc_config_accounts_filter_datasize() { - let filter = ConfigGrpc2KafkaRequestAccountsFilter::DataSize(42); - let text = serde_json::to_string(&filter).unwrap(); - assert_eq!( - serde_json::from_str::(&text).unwrap(), - filter - ); - } - - #[test] - fn grpc_config_accounts_filter_token() { - let filter = ConfigGrpc2KafkaRequestAccountsFilter::TokenAccountState; - let text = serde_json::to_string(&filter).unwrap(); - assert_eq!( - serde_json::from_str::(&text).unwrap(), - filter - ); - } -} diff --git a/yellowstone-grpc-tools/src/kafka/prom.rs b/yellowstone-grpc-tools/src/kafka/prom.rs index 939d98b5..9fa48cd7 100644 --- a/yellowstone-grpc-tools/src/kafka/prom.rs +++ b/yellowstone-grpc-tools/src/kafka/prom.rs @@ -1,4 +1,5 @@ use { + crate::prom::GprcMessageKind, prometheus::{GaugeVec, IntCounter, IntCounterVec, Opts}, rdkafka::{ client::ClientContext, @@ -8,7 +9,6 @@ use { producer::FutureProducer, statistics::Statistics, }, - yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof, }; lazy_static::lazy_static! { @@ -103,45 +103,6 @@ impl StatsContext { } } -#[derive(Debug, Clone, Copy)] -pub enum GprcMessageKind { - Account, - Slot, - Transaction, - Block, - BlockMeta, - Entry, - Unknown, -} - -impl From<&UpdateOneof> for GprcMessageKind { - fn from(msg: &UpdateOneof) -> Self { - match msg { - UpdateOneof::Account(_) => Self::Account, - UpdateOneof::Slot(_) => Self::Slot, - UpdateOneof::Transaction(_) => Self::Transaction, - UpdateOneof::Block(_) => Self::Block, - UpdateOneof::Ping(_) => unreachable!(), - UpdateOneof::BlockMeta(_) => Self::BlockMeta, - UpdateOneof::Entry(_) => Self::Entry, - } - } -} - -impl GprcMessageKind { - const fn as_str(self) -> &'static str { - match self { - GprcMessageKind::Account => "account", - GprcMessageKind::Slot => "slot", - GprcMessageKind::Transaction => "transaction", - GprcMessageKind::Block => "block", - GprcMessageKind::BlockMeta => "blockmeta", - GprcMessageKind::Entry => "entry", - GprcMessageKind::Unknown => "unknown", - } - } -} - pub fn dedup_inc() { KAFKA_DEDUP_TOTAL.inc(); } diff --git a/yellowstone-grpc-tools/src/lib.rs b/yellowstone-grpc-tools/src/lib.rs index 2d1092c3..159635d8 100644 --- a/yellowstone-grpc-tools/src/lib.rs +++ b/yellowstone-grpc-tools/src/lib.rs @@ -2,6 +2,25 @@ #![deny(clippy::missing_const_for_fn)] #![deny(clippy::trivially_copy_pass_by_ref)] +pub mod config; +pub mod google_pubsub; pub mod kafka; pub mod prom; pub mod version; + +use { + futures::future::{BoxFuture, FutureExt}, + tokio::signal::unix::{signal, SignalKind}, +}; + +pub fn create_shutdown() -> anyhow::Result> { + let mut sigint = signal(SignalKind::interrupt())?; + let mut sigterm = signal(SignalKind::terminate())?; + Ok(async move { + tokio::select! { + _ = sigint.recv() => {}, + _ = sigterm.recv() => {} + }; + } + .boxed()) +} diff --git a/yellowstone-grpc-tools/src/prom.rs b/yellowstone-grpc-tools/src/prom.rs index af93f782..a77a7084 100644 --- a/yellowstone-grpc-tools/src/prom.rs +++ b/yellowstone-grpc-tools/src/prom.rs @@ -1,5 +1,6 @@ use { crate::{ + google_pubsub::prom::GOOGLE_PUBSUB_SENT_TOTAL, kafka::prom::{KAFKA_DEDUP_TOTAL, KAFKA_RECV_TOTAL, KAFKA_SENT_TOTAL, KAFKA_STATS}, version::VERSION as VERSION_INFO, }, @@ -11,6 +12,7 @@ use { prometheus::{IntCounterVec, Opts, Registry, TextEncoder}, std::{net::SocketAddr, sync::Once}, tracing::{error, info}, + yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof, }; lazy_static::lazy_static! { @@ -33,6 +35,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { }; } register!(VERSION); + register!(GOOGLE_PUBSUB_SENT_TOTAL); register!(KAFKA_STATS); register!(KAFKA_DEDUP_TOTAL); register!(KAFKA_RECV_TOTAL); @@ -87,3 +90,42 @@ fn not_found_handler() -> Response { .body(Body::empty()) .unwrap() } + +#[derive(Debug, Clone, Copy)] +pub enum GprcMessageKind { + Account, + Slot, + Transaction, + Block, + BlockMeta, + Entry, + Unknown, +} + +impl From<&UpdateOneof> for GprcMessageKind { + fn from(msg: &UpdateOneof) -> Self { + match msg { + UpdateOneof::Account(_) => Self::Account, + UpdateOneof::Slot(_) => Self::Slot, + UpdateOneof::Transaction(_) => Self::Transaction, + UpdateOneof::Block(_) => Self::Block, + UpdateOneof::Ping(_) => unreachable!(), + UpdateOneof::BlockMeta(_) => Self::BlockMeta, + UpdateOneof::Entry(_) => Self::Entry, + } + } +} + +impl GprcMessageKind { + pub const fn as_str(self) -> &'static str { + match self { + GprcMessageKind::Account => "account", + GprcMessageKind::Slot => "slot", + GprcMessageKind::Transaction => "transaction", + GprcMessageKind::Block => "block", + GprcMessageKind::BlockMeta => "blockmeta", + GprcMessageKind::Entry => "entry", + GprcMessageKind::Unknown => "unknown", + } + } +}