Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform failed (using Azure Event Hubs & ADLS Gen 2 storage account) #151

Open
Jaegin opened this issue Sep 11, 2023 · 1 comment
Open
Labels
azure bug Something isn't working

Comments

@Jaegin
Copy link

Jaegin commented Sep 11, 2023

Hi,

I am getting the following Transform failed warning:

2023-09-11T10:29:18 [WARN] - web_requests: Transform failed - partition 0, offset 22
2023-09-11T10:29:18 [DEBUG] - web_requests: Could not send to statsd Connection refused (os error 111)
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete record batch - latency test: 1740 >= 5000
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete record batch - buffer length test: 0 >= 5000
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete file - latency test: 1 >= 5
2023-09-11T10:29:18 [DEBUG] - web_requests: Should complete file - num bytes test: 0 >= 134217728
2023-09-11T10:29:18 [DEBUG] - web_requests: Could not send to statsd Connection refused (os error 111)
2023-09-11T10:29:18 [DEBUG] - web_requests: Could not send to statsd Connection refused (os error 111)

Here is my docker run command:

docker run -it --network=host ^
-e RUST_LOG="debug" ^
-e SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt ^
-e AZURE_STORAGE_ACCOUNT_NAME=XXX ^
-e "AZURE_STORAGE_ACCOUNT_KEY=XXX" ^
-e RUST_BACKTRACE=full ^
kdi:0.1 ^
ingest web_requests abfss://XXX@XXX.dfs.core.windows.net/web_requests ^
--allowed_latency 5 ^
--kafka XXX.servicebus.windows.net:9093 ^
--kafka_setting security.protocol=SASL_SSL ^
--kafka_setting sasl.mechanism=PLAIN ^
--kafka_setting sasl.username=$ConnectionString ^
--kafka_setting sasl.password=Endpoint=sb://XXX.servicebus.windows.net/;SharedAccessKeyName=XXX;SharedAccessKey=XXX ^
--kafka_setting socket.keepalive.enable=true ^
--kafka_setting metadata.max.age.ms=180000 ^
--kafka_setting heartbeat.interval.ms=3000 ^
--kafka_setting session.timeout.ms=30000 ^
--kafka_setting debug=broker,security,protocol ^
--app_id web_requests ^
--transform "date: substr(meta.producer.timestamp, '0', '10')" ^
--transform "meta.kafka.offset: kafka.offset" ^
--transform "meta.kafka.partition: kafka.partition" ^
--transform "meta.kafka.topic: kafka.topic" ^
--auto_offset_reset earliest

Submitted the standard payload (see below) using the Events Hub Generate data feature within the Azure Portal and this was successfully sent (confirmed by kcat).

Also successfully uploaded the following first Delta transaction containing the schema (https://github.com/delta-io/kafka-delta-ingest/blob/main/tests/data/web_requests/_delta_log/00000000000000000000.json) to the relevant directory:

{"commitInfo":{"timestamp":1564524295023,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"22ef18ba-191c-4c36-a606-3dad5cdf3830","format":{"provider":"parquet","options":{}},"schemaString":"{"type":"struct","fields":[{"name":"meta","type":{"type":"struct","fields":[{"name":"producer","type":{"type":"struct","fields":[{"name":"timestamp","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"kafka","type":{"type":"struct","fields":[{"name":"offset","type":"long","nullable":true,"metadata":{}},{"name":"topic","type":"string","nullable":true,"metadata":{}},{"name":"partition","type":"integer","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"method","type":"string","nullable":true,"metadata":{}},{"name":"session_id","type":"string","nullable":true,"metadata":{}},{"name":"status","type":"integer","nullable":true,"metadata":{}},{"name":"url","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"string","nullable":true,"metadata":{}},{"name":"date","type":"string","nullable":true,"metadata":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1564524294376}}

To continue to use Rust version 1.67, I tweaked the Cargo.lock and Cargo,toml files (see below for Cargo.toml), but not expected to be the cause of any errors.

Really appreciate any help you can offer.


Standard Payload:

{
"status": 200,
"session_id": "7c28bcf9-be26-4d0b-931a-3374ab4bb458",
"method": "GET",
"meta": {
"producer": {
"timestamp": "2021-03-24T15:06:17.321710+00:00"
}
},
"uuid": "831c6afa-375c-4988-b248-096f9ed101f8",
"url": "http://www.example.com"
}


Cargo.toml:

[package]
name = "kafka-delta-ingest"
version = "0.2.0"
authors = ["R. Tyler Croy rtyler@brokenco.de", "Christian Williams christianw@scribd.com"]
edition = "2018"

[dependencies]
anstream = "=0.3.2"
anyhow = "=1.0.26"
async-trait = "=0.1.53"
apache-avro = "=0.14.0"
base64 = "=0.13.0"
bytes = "=1.1.0"
chrono = "=0.4.24"
clap = { version = "=4.3.0", features = ["color"] }
clap_builder = "=4.3.0"
clap_lex = "=0.5.0"
dipstick = "=0.9.0"
dotenv = "=0.15"
env_logger = "=0.10.0"
futures = "=0.3.15"
half = "=2.2.1"
jmespatch = { version = "=0.3.0", features = ["sync"] }
lazy_static = "=1.4.0"
log = "=0.4.17"
maplit = "=1.0.2"
rdkafka = { version = "=0.28.0", features = ["ssl-vendored"] }
rusoto_core = { version = "=0.46.0" }
rusoto_credential = { version = "=0.46.0" }
rusoto_s3 = { version = "=0.46.0" }
schema_registry_converter = { version = "=3.1.0", features = ["easy", "json", "avro"] }
serde = { version = "=1.0.140", features = ["derive"] }
serde_json = "=1.0.82"
strum = "=0.20.0"
strum_macros = "=0.20.0"
thiserror = "=1.0.31"
tokio = { version = "=1.25.0", features = ["full"] }
tokio-stream = { version = "=0.1.8", features = ["fs"] }
tokio-util = "=0.6.3"
uuid = { version = "=0.8.2", features = ["serde", "v4"] }

deltalake = { version = "=0.11.0", features = ["s3", "azure"] }
dynamodb_lock = "=0.4.3"

sentry

sentry = { version = "=0.23.0", optional = true }
url = "2.3"

[features]
sentry-ext = ["sentry"]
dynamic-linking = [ "rdkafka/dynamic-linking" ]

[dev-dependencies]
deltalake = { version = "=0.11.0", features = ["s3", "azure", "json"] }
utime = "0.3"
serial_test = "=0.6.0"
tempfile = "=3.2.0"
azure_core = "=0.10.0"
azure_storage = "=0.10.0"
azure_storage_blobs = "=0.10.0"
time = "0.3"

[profile.release]
lto = true

@Jaegin
Copy link
Author

Jaegin commented Sep 14, 2023

Hi @rtyler - are you able to help with my issue above?

If you need any further information, please just shout, and thanks in advance :-)

Cheers,
Steve

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
azure bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants