From 695752b3d98e02925b790297263ea04b7e08a4d9 Mon Sep 17 00:00:00 2001 From: Zhewei Hu Date: Sun, 3 Dec 2023 09:26:48 -0800 Subject: [PATCH] [ZK filter] Add per-opcode decoder error metrics (#31138) Signed-off-by: Zhewei Hu --- .../zookeeper_proxy/v3/zookeeper_proxy.proto | 5 +- changelogs/current.yaml | 4 + .../_include/zookeeper-filter-proxy.yaml | 1 + .../zookeeper_proxy_filter.rst | 30 ++ .../filters/network/zookeeper_proxy/config.cc | 7 +- .../network/zookeeper_proxy/decoder.cc | 239 ++++++++------- .../filters/network/zookeeper_proxy/decoder.h | 11 +- .../filters/network/zookeeper_proxy/filter.cc | 107 ++++--- .../filters/network/zookeeper_proxy/filter.h | 45 ++- .../filters/network/zookeeper_proxy/utils.h | 8 +- .../network/zookeeper_proxy/config_test.cc | 5 + .../network/zookeeper_proxy/filter_test.cc | 289 +++++++++++------- 12 files changed, 493 insertions(+), 258 deletions(-) diff --git a/api/envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.proto b/api/envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.proto index 49c9b6005dba..bb19c752e0c8 100644 --- a/api/envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.proto +++ b/api/envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.proto @@ -19,7 +19,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ZooKeeper Proxy :ref:`configuration overview `. // [#extension: envoy.filters.network.zookeeper_proxy] -// [#next-free-field: 9] +// [#next-free-field: 10] message ZooKeeperProxy { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.network.zookeeper_proxy.v1alpha1.ZooKeeperProxy"; @@ -65,6 +65,9 @@ message ZooKeeperProxy { // Whether to emit per opcode response bytes metrics. If not set, it defaults to false. bool enable_per_opcode_response_bytes_metrics = 8; + + // Whether to emit per opcode decoder error metrics. If not set, it defaults to false. + bool enable_per_opcode_decoder_error_metrics = 9; } message LatencyThresholdOverride { diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 571fc8cf3b48..3d399190bcf3 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -221,5 +221,9 @@ new_features: ` to allow recording an access log entry periodically for the UDP session, and allow recording an access log entry on the connection tunnel created successfully to upstream when UDP tunneling is configured. +- area: zookeeper + change: | + Added support for emitting per opcode decoder error metrics via :ref:`enable_per_opcode_decoder_error_metrics + `. deprecated: diff --git a/docs/root/configuration/listeners/network_filters/_include/zookeeper-filter-proxy.yaml b/docs/root/configuration/listeners/network_filters/_include/zookeeper-filter-proxy.yaml index 4b079ef6362d..25a534fc79d1 100644 --- a/docs/root/configuration/listeners/network_filters/_include/zookeeper-filter-proxy.yaml +++ b/docs/root/configuration/listeners/network_filters/_include/zookeeper-filter-proxy.yaml @@ -13,6 +13,7 @@ static_resources: stat_prefix: zookeeper enable_per_opcode_request_bytes_metrics: true enable_per_opcode_response_bytes_metrics: true + enable_per_opcode_decoder_error_metrics: true enable_latency_threshold_metrics: true default_latency_threshold: "0.1s" latency_threshold_overrides: diff --git a/docs/root/configuration/listeners/network_filters/zookeeper_proxy_filter.rst b/docs/root/configuration/listeners/network_filters/zookeeper_proxy_filter.rst index e98f00795008..bde21b88127f 100644 --- a/docs/root/configuration/listeners/network_filters/zookeeper_proxy_filter.rst +++ b/docs/root/configuration/listeners/network_filters/zookeeper_proxy_filter.rst @@ -41,6 +41,8 @@ Every configured ZooKeeper proxy filter has statistics rooted at *. *_resp_bytes* are per-opcode metrics and will only be emitted when :ref:`enable_per_opcode_response_bytes_metrics ` is set to ``true``. +*_decoder_error* are per-opcode metrics and will only be emitted when :ref:`enable_per_opcode_decoder_error_metrics ` is set to ``true``. + The following counters are available: .. csv-table:: @@ -48,6 +50,34 @@ The following counters are available: :widths: 1, 1, 2 decoder_error, Counter, Number of times a message wasn't decoded + connect_decoder_error, Counter, Number of times a connect request message wasn't decoded + ping_decoder_error, Counter, Number of times a ping request message wasn't decoded + auth_decoder_error, Counter, Number of times a auth request message wasn't decoded + getdata_decoder_error, Counter, Number of times a getdata request message wasn't decoded + create_decoder_error, Counter, Number of times a create request message wasn't decoded + create2_decoder_error, Counter, Number of times a create2 request message wasn't decoded + createcontainer_decoder_error, Counter, Number of times a createcontainer request message wasn't decoded + createttl_decoder_error, Counter, Number of times a createttl request message wasn't decoded + setdata_decoder_error, Counter, Number of times a setdata request message wasn't decoded + getchildren_decoder_error, Counter, Number of times a getchildren request message wasn't decoded + getchildren2_decoder_error, Counter, Number of times a getchildren2 request message wasn't decoded + getephemerals_decoder_error, Counter, Number of times a getephemerals request message wasn't decoded + getallchildrennumber_decoder_error, Counter, Number of times a getallchildrennumber request message wasn't decoded + delete_decoder_error, Counter, Number of times a delete request message wasn't decoded + exists_decoder_error, Counter, Number of times a exists request message wasn't decoded + getacl_decoder_error, Counter, Number of times a getacl request message wasn't decoded + setacl_decoder_error, Counter, Number of times a setacl request message wasn't decoded + sync_decoder_error, Counter, Number of times a sync request message wasn't decoded + multi_decoder_error, Counter, Number of times a multi request message wasn't decoded + reconfig_decoder_error, Counter, Number of times a reconfig request message wasn't decoded + close_decoder_error, Counter, Number of times a close request message wasn't decoded + setauth_decoder_error, Counter, Number of times a setauth request message wasn't decoded + setwatches_decoder_error, Counter, Number of times a setwatches request message wasn't decoded + setwatches2_decoder_error, Counter, Number of times a setwatches2 request message wasn't decoded + addwatch_decoder_error, Counter, Number of times a addwatch request message wasn't decoded + checkwatches_decoder_error, Counter, Number of times a checkwatches request message wasn't decoded + removewatches_decoder_error, Counter, Number of times a removewatches request message wasn't decoded + check_decoder_error, Counter, Number of times a check request message wasn't decoded request_bytes, Counter, Number of bytes in decoded request messages connect_rq_bytes, Counter, Number of bytes in decoded connect request messages ping_rq_bytes, Counter, Number of bytes in decoded ping request messages diff --git a/source/extensions/filters/network/zookeeper_proxy/config.cc b/source/extensions/filters/network/zookeeper_proxy/config.cc index 0e0cbcd2bcf6..cff429ddec65 100644 --- a/source/extensions/filters/network/zookeeper_proxy/config.cc +++ b/source/extensions/filters/network/zookeeper_proxy/config.cc @@ -31,6 +31,8 @@ Network::FilterFactoryCb ZooKeeperConfigFactory::createFilterFactoryFromProtoTyp proto_config.enable_per_opcode_request_bytes_metrics(); const bool enable_per_opcode_response_bytes_metrics = proto_config.enable_per_opcode_response_bytes_metrics(); + const bool enable_per_opcode_decoder_error_metrics = + proto_config.enable_per_opcode_decoder_error_metrics(); const bool enable_latency_threshold_metrics = proto_config.enable_latency_threshold_metrics(); const std::chrono::milliseconds default_latency_threshold( PROTOBUF_GET_MS_OR_DEFAULT(proto_config, default_latency_threshold, 100)); @@ -49,8 +51,9 @@ Network::FilterFactoryCb ZooKeeperConfigFactory::createFilterFactoryFromProtoTyp ZooKeeperFilterConfigSharedPtr filter_config(std::make_shared( stat_prefix, max_packet_bytes, enable_per_opcode_request_bytes_metrics, - enable_per_opcode_response_bytes_metrics, enable_latency_threshold_metrics, - default_latency_threshold, latency_threshold_overrides, context.scope())); + enable_per_opcode_response_bytes_metrics, enable_per_opcode_decoder_error_metrics, + enable_latency_threshold_metrics, default_latency_threshold, latency_threshold_overrides, + context.scope())); auto& time_source = context.getServerFactoryContext().mainThreadDispatcher().timeSource(); return [filter_config, &time_source](Network::FilterManager& filter_manager) -> void { diff --git a/source/extensions/filters/network/zookeeper_proxy/decoder.cc b/source/extensions/filters/network/zookeeper_proxy/decoder.cc index 92367e55063a..0fc676fd6ac9 100644 --- a/source/extensions/filters/network/zookeeper_proxy/decoder.cc +++ b/source/extensions/filters/network/zookeeper_proxy/decoder.cc @@ -50,18 +50,18 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan // Check message length. const absl::StatusOr len = helper_.peekInt32(data, offset); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - len, fmt::format("peekInt32 for len: {}", len.status().message())); + len, absl::nullopt, fmt::format("peekInt32 for len: {}", len.status().message())); ENVOY_LOG(trace, "zookeeper_proxy: decoding request with len {} at offset {}", len.value(), offset); absl::Status status = ensureMinLength(len.value(), XID_LENGTH + INT_LENGTH); // xid + opcode EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("ensureMinLength: {}", status.message())); + status, absl::nullopt, fmt::format("ensureMinLength: {}", status.message())); status = ensureMaxLength(len.value()); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("ensureMaxLength: {}", status.message())); + status, absl::nullopt, fmt::format("ensureMaxLength: {}", status.message())); auto start_time = time_source_.monotonicTime(); @@ -77,7 +77,7 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan // as a regular data request, so we support that as well. const absl::StatusOr xid = helper_.peekInt32(data, offset); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - xid, fmt::format("peerInt32 for xid: {}", xid.status().message())); + xid, absl::nullopt, fmt::format("peerInt32 for xid: {}", xid.status().message())); ENVOY_LOG(trace, "zookeeper_proxy: decoding request with xid {} at offset {}", xid.value(), offset); @@ -124,7 +124,7 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan // the session alive. const absl::StatusOr oc = helper_.peekInt32(data, offset); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - oc, fmt::format("peekInt32 for opcode: {}", oc.status().message())); + oc, absl::nullopt, fmt::format("peekInt32 for opcode: {}", oc.status().message())); ENVOY_LOG(trace, "zookeeper_proxy: decoding request with opcode {} at offset {}", oc.value(), offset); @@ -140,7 +140,7 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan case OpCodes::Create2: case OpCodes::CreateContainer: case OpCodes::CreateTtl: - status = parseCreateRequest(data, offset, len.value(), static_cast(opcode)); + status = parseCreateRequest(data, offset, len.value(), opcode); RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( status, fmt::format("parseCreateRequest: {}", status.message())); break; @@ -152,12 +152,12 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan case OpCodes::GetChildren: status = parseGetChildrenRequest(data, offset, len.value(), false); RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseGetChildrenRequest: {}", status.message())); + status, fmt::format("parseGetChildrenRequest (get children): {}", status.message())); break; case OpCodes::GetChildren2: status = parseGetChildrenRequest(data, offset, len.value(), true); RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseGetChildrenRequest: {}", status.message())); + status, fmt::format("parseGetChildrenRequest (get children2): {}", status.message())); break; case OpCodes::Delete: status = parseDeleteRequest(data, offset, len.value()); @@ -180,7 +180,7 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan status, fmt::format("parseSetAclRequest: {}", status.message())); break; case OpCodes::Sync: - status = callbacks_.onSyncRequest(pathOnlyRequest(data, offset, len.value())); + status = callbacks_.onSyncRequest(pathOnlyRequest(data, offset, len.value()), opcode); RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, fmt::format("onSyncRequest: {}", status.message())); break; @@ -225,12 +225,13 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan status, fmt::format("parseXWatchesRequest (remove watches): {}", status.message())); break; case OpCodes::GetEphemerals: - status = callbacks_.onGetEphemeralsRequest(pathOnlyRequest(data, offset, len.value())); + status = callbacks_.onGetEphemeralsRequest(pathOnlyRequest(data, offset, len.value()), opcode); RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( status, fmt::format("onGetEphemeralsRequest: {}", status.message())); break; case OpCodes::GetAllChildrenNumber: - status = callbacks_.onGetAllChildrenNumberRequest(pathOnlyRequest(data, offset, len.value())); + status = callbacks_.onGetAllChildrenNumberRequest(pathOnlyRequest(data, offset, len.value()), + opcode); RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( status, fmt::format("onGetAllChildrenNumberRequest: {}", status.message())); break; @@ -240,7 +241,7 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan default: ENVOY_LOG(debug, "zookeeper_proxy: decodeOnData exception: unknown opcode {}", enumToSignedInt(opcode)); - callbacks_.onDecodeError(); + callbacks_.onDecodeError(absl::nullopt); return absl::nullopt; } @@ -257,7 +258,7 @@ absl::StatusOr> DecoderImpl::decodeOnWrite(Buffer::Insta // Check message length. const absl::StatusOr len = helper_.peekInt32(data, offset); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - len, fmt::format("peekInt32 for len: {}", len.status().message())); + len, absl::nullopt, fmt::format("peekInt32 for len: {}", len.status().message())); ENVOY_LOG(trace, "zookeeper_proxy: decoding response with len.value() {} at offset {}", len.value(), offset); @@ -265,15 +266,15 @@ absl::StatusOr> DecoderImpl::decodeOnWrite(Buffer::Insta absl::Status status = ensureMinLength(len.value(), XID_LENGTH + ZXID_LENGTH + INT_LENGTH); // xid + zxid + err EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("ensureMinLength: {}", status.message())); + status, absl::nullopt, fmt::format("ensureMinLength: {}", status.message())); status = ensureMaxLength(len.value()); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("ensureMaxLength: {}", status.message())); + status, absl::nullopt, fmt::format("ensureMaxLength: {}", status.message())); const absl::StatusOr xid = helper_.peekInt32(data, offset); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - xid, fmt::format("peekInt32 for xid: {}", xid.status().message())); + xid, absl::nullopt, fmt::format("peekInt32 for xid: {}", xid.status().message())); ENVOY_LOG(trace, "zookeeper_proxy: decoding response with xid {} at offset {}", xid.value(), offset); @@ -292,7 +293,7 @@ absl::StatusOr> DecoderImpl::decodeOnWrite(Buffer::Insta case XidCodes::SetWatchesXid: latency = fetchControlRequestData(xid.value(), opcode); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - latency, fmt::format("fetchControlRequestData: {}", latency.status().message())); + latency, opcode, fmt::format("fetchControlRequestData: {}", latency.status().message())); break; case XidCodes::WatchXid: // WATCH_XID is generated by the server, no need to fetch opcode and latency here. @@ -300,7 +301,7 @@ absl::StatusOr> DecoderImpl::decodeOnWrite(Buffer::Insta default: latency = fetchDataRequestData(xid.value(), opcode); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - latency, fmt::format("fetchDataRequestData: {}", latency.status().message())); + latency, opcode, fmt::format("fetchDataRequestData: {}", latency.status().message())); } // Connect responses are special, they have no full reply header @@ -316,11 +317,11 @@ absl::StatusOr> DecoderImpl::decodeOnWrite(Buffer::Insta // Control responses that aren't connect, with XIDs <= 0. const absl::StatusOr zxid = helper_.peekInt64(data, offset); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - zxid, fmt::format("peekInt64 for zxid: {}", zxid.status().message())); + zxid, absl::nullopt, fmt::format("peekInt64 for zxid: {}", zxid.status().message())); const absl::StatusOr error = helper_.peekInt32(data, offset); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - error, fmt::format("peekInt32 for error: {}", error.status().message())); + error, absl::nullopt, fmt::format("peekInt32 for error: {}", error.status().message())); ENVOY_LOG(trace, "zookeeper_proxy: decoding response with zxid.value() {} and error {} at offset {}", @@ -371,17 +372,17 @@ absl::Status DecoderImpl::ensureMaxLength(const int32_t len) const { absl::Status DecoderImpl::parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Connect); // Skip zxid, timeout, and session id. offset += ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH; // Skip password. status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Connect); const absl::StatusOr readonly = maybeReadBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, OpCodes::Connect, readonly.status().message()); callbacks_.onConnect(readonly.value()); @@ -392,16 +393,17 @@ absl::Status DecoderImpl::parseConnect(Buffer::Instance& data, uint64_t& offset, absl::Status DecoderImpl::parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + INT_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetAuth); // Skip opcode + type. offset += OPCODE_LENGTH + INT_LENGTH; const absl::StatusOr scheme = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(scheme, scheme.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(scheme, OpCodes::SetAuth, + scheme.status().message()); // Skip credential. status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetAuth); callbacks_.onAuthRequest(scheme.value()); @@ -411,13 +413,15 @@ absl::Status DecoderImpl::parseAuthRequest(Buffer::Instance& data, uint64_t& off absl::Status DecoderImpl::parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::GetData); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::GetData, + path.status().message()); const absl::StatusOr watch = helper_.peekBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, watch.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, OpCodes::GetData, + watch.status().message()); callbacks_.onGetDataRequest(path.value(), watch.value()); @@ -448,43 +452,46 @@ absl::Status DecoderImpl::skipAcls(Buffer::Instance& data, uint64_t& offset) { absl::Status DecoderImpl::parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, OpCodes opcode) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (4 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, opcode, + path.status().message()); // Skip data. status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode); status = skipAcls(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode); absl::StatusOr flag_data = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(flag_data, + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(flag_data, opcode, flag_data.status().message()); const CreateFlags flags = static_cast(flag_data.value()); status = callbacks_.onCreateRequest(path.value(), flags, opcode); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode); return absl::OkStatus(); } absl::Status DecoderImpl::parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetData); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::SetData, + path.status().message()); // Skip data. status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetData); // Ignore version. absl::StatusOr version = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, OpCodes::SetData, + version.status().message()); callbacks_.onSetRequest(path.value()); @@ -492,17 +499,23 @@ absl::Status DecoderImpl::parseSetRequest(Buffer::Instance& data, uint64_t& offs } absl::Status DecoderImpl::parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len, const bool two) { + uint32_t len, const bool v2) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + OpCodes opcode = OpCodes::GetChildren; + if (v2) { + opcode = OpCodes::GetChildren2; + } + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, opcode, + path.status().message()); const absl::StatusOr watch = helper_.peekBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, watch.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, opcode, + watch.status().message()); - callbacks_.onGetChildrenRequest(path.value(), watch.value(), two); + callbacks_.onGetChildrenRequest(path.value(), watch.value(), v2); return absl::OkStatus(); } @@ -510,13 +523,15 @@ absl::Status DecoderImpl::parseGetChildrenRequest(Buffer::Instance& data, uint64 absl::Status DecoderImpl::parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Delete); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::Delete, + path.status().message()); const absl::StatusOr version = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, OpCodes::Delete, + version.status().message()); callbacks_.onDeleteRequest(path.value(), version.value()); @@ -526,13 +541,15 @@ absl::Status DecoderImpl::parseDeleteRequest(Buffer::Instance& data, uint64_t& o absl::Status DecoderImpl::parseExistsRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Exists); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::Exists, + path.status().message()); const absl::StatusOr watch = helper_.peekBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, watch.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, OpCodes::Exists, + watch.status().message()); callbacks_.onExistsRequest(path.value(), watch.value()); @@ -542,10 +559,11 @@ absl::Status DecoderImpl::parseExistsRequest(Buffer::Instance& data, uint64_t& o absl::Status DecoderImpl::parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::GetAcl); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::GetAcl, + path.status().message()); callbacks_.onGetAclRequest(path.value()); @@ -555,16 +573,18 @@ absl::Status DecoderImpl::parseGetAclRequest(Buffer::Instance& data, uint64_t& o absl::Status DecoderImpl::parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetAcl); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::SetAcl, + path.status().message()); status = skipAcls(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetAcl); const absl::StatusOr version = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, OpCodes::SetAcl, + version.status().message()); callbacks_.onSetAclRequest(path.value(), version.value()); @@ -574,7 +594,8 @@ absl::Status DecoderImpl::parseSetAclRequest(Buffer::Instance& data, uint64_t& o absl::StatusOr DecoderImpl::pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( status, fmt::format("zookeeper_proxy: path only request decoding exception {}", status.message())); @@ -584,13 +605,15 @@ absl::StatusOr DecoderImpl::pathOnlyRequest(Buffer::Instance& data, absl::Status DecoderImpl::parseCheckRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Check); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::Check, + path.status().message()); const absl::StatusOr version = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, OpCodes::Check, + version.status().message()); callbacks_.onCheckRequest(path.value(), version.value()); @@ -601,18 +624,21 @@ absl::Status DecoderImpl::parseMultiRequest(Buffer::Instance& data, uint64_t& of uint32_t len) { // Treat empty transactions as a decoding error, there should be at least 1 header. absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + MULTI_HEADER_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Multi); while (true) { const absl::StatusOr opcode = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(opcode, opcode.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(opcode, OpCodes::Multi, + opcode.status().message()); const absl::StatusOr done = helper_.peekBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(done, done.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(done, OpCodes::Multi, + done.status().message()); // Ignore error field. const absl::StatusOr error = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(error, error.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(error, OpCodes::Multi, + error.status().message()); if (done.value()) { break; @@ -621,22 +647,22 @@ absl::Status DecoderImpl::parseMultiRequest(Buffer::Instance& data, uint64_t& of switch (static_cast(opcode.value())) { case OpCodes::Create: status = parseCreateRequest(data, offset, len, OpCodes::Create); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Create); break; case OpCodes::SetData: status = parseSetRequest(data, offset, len); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetData); break; case OpCodes::Check: status = parseCheckRequest(data, offset, len); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Check); break; case OpCodes::Delete: status = parseDeleteRequest(data, offset, len); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Delete); break; default: - callbacks_.onDecodeError(); + callbacks_.onDecodeError(absl::nullopt); return absl::InvalidArgumentError( fmt::format("unknown opcode within a transaction: {}", opcode.value())); } @@ -651,22 +677,22 @@ absl::Status DecoderImpl::parseReconfigRequest(Buffer::Instance& data, uint64_t& uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH) + LONG_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Reconfig); // Skip joining. status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Reconfig); // Skip leaving. status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Reconfig); // Skip new members. status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Reconfig); // Read config id. absl::StatusOr config_id = helper_.peekInt64(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(config_id, + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(config_id, OpCodes::Reconfig, config_id.status().message()); callbacks_.onReconfigRequest(); @@ -678,23 +704,24 @@ absl::Status DecoderImpl::parseSetWatchesRequest(Buffer::Instance& data, uint64_ uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (3 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches); // Ignore relative Zxid. absl::StatusOr zxid = helper_.peekInt64(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, zxid.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, OpCodes::SetWatches, + zxid.status().message()); // Data watches. status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches); // Exist watches. status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches); // Child watches. status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches); callbacks_.onSetWatchesRequest(); @@ -705,31 +732,32 @@ absl::Status DecoderImpl::parseSetWatches2Request(Buffer::Instance& data, uint64 uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (5 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2); // Ignore relative Zxid. absl::StatusOr zxid = helper_.peekInt64(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, zxid.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, OpCodes::SetWatches2, + zxid.status().message()); // Data watches. status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2); // Exist watches. status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2); // Child watches. status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2); // Persistent watches. status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2); // Persistent recursive watches. status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2); callbacks_.onSetWatches2Request(); @@ -739,13 +767,15 @@ absl::Status DecoderImpl::parseSetWatches2Request(Buffer::Instance& data, uint64 absl::Status DecoderImpl::parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::AddWatch); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::AddWatch, + path.status().message()); const absl::StatusOr mode = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(mode, mode.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(mode, OpCodes::AddWatch, + mode.status().message()); callbacks_.onAddWatchRequest(path.value(), mode.value()); @@ -755,13 +785,14 @@ absl::Status DecoderImpl::parseAddWatchRequest(Buffer::Instance& data, uint64_t& absl::Status DecoderImpl::parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, OpCodes opcode) { absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, opcode, + path.status().message()); const absl::StatusOr watch_type = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch_type, + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch_type, opcode, watch_type.status().message()); if (opcode == OpCodes::CheckWatches) { @@ -856,15 +887,15 @@ absl::Status DecoderImpl::decodeAndBufferHelper(Buffer::Instance& data, DecodeTy // Peek packet length. len = helper_.peekInt32(data, offset); EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - len, fmt::format("peekInt32 for len: {}", len.status().message())); + len, absl::nullopt, fmt::format("peekInt32 for len: {}", len.status().message())); status = ensureMinLength(len.value(), dtype == DecodeType::READ ? XID_LENGTH + INT_LENGTH : XID_LENGTH + ZXID_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, absl::nullopt); status = ensureMaxLength(len.value()); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, absl::nullopt); offset += len.value(); if (offset <= data_len) { @@ -943,18 +974,19 @@ absl::Status DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& const std::chrono::milliseconds latency) { absl::Status status = ensureMinLength(len, PROTOCOL_VERSION_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Connect); const absl::StatusOr timeout = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(timeout, timeout.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(timeout, OpCodes::Connect, + timeout.status().message()); // Skip session id + password. offset += SESSION_LENGTH; status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Connect); const absl::StatusOr readonly = maybeReadBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, OpCodes::Connect, readonly.status().message()); callbacks_.onConnectResponse(0, timeout.value(), readonly.value(), latency); @@ -966,18 +998,19 @@ absl::Status DecoderImpl::parseWatchEvent(Buffer::Instance& data, uint64_t& offs const uint32_t len, const int64_t zxid, const int32_t error) { absl::Status status = ensureMinLength(len, SERVER_HEADER_LENGTH + (3 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, absl::nullopt); const absl::StatusOr event_type = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(event_type, + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(event_type, absl::nullopt, event_type.status().message()); const absl::StatusOr client_state = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(client_state, + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(client_state, absl::nullopt, client_state.status().message()); const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, absl::nullopt, + path.status().message()); callbacks_.onWatchEvent(event_type.value(), client_state.value(), path.value(), zxid, error); diff --git a/source/extensions/filters/network/zookeeper_proxy/decoder.h b/source/extensions/filters/network/zookeeper_proxy/decoder.h index 8718a8d9d6b1..875edd363546 100644 --- a/source/extensions/filters/network/zookeeper_proxy/decoder.h +++ b/source/extensions/filters/network/zookeeper_proxy/decoder.h @@ -80,7 +80,7 @@ class DecoderCallbacks { public: virtual ~DecoderCallbacks() = default; - virtual void onDecodeError() PURE; + virtual void onDecodeError(const absl::optional opcode) PURE; virtual void onRequestBytes(const absl::optional opcode, const uint64_t bytes) PURE; virtual void onConnect(bool readonly) PURE; virtual void onPing() PURE; @@ -90,13 +90,16 @@ class DecoderCallbacks { OpCodes opcode) PURE; virtual void onSetRequest(const std::string& path) PURE; virtual void onGetChildrenRequest(const std::string& path, bool watch, bool v2) PURE; - virtual absl::Status onGetEphemeralsRequest(const absl::StatusOr& path) PURE; - virtual absl::Status onGetAllChildrenNumberRequest(const absl::StatusOr& path) PURE; + virtual absl::Status onGetEphemeralsRequest(const absl::StatusOr& path, + const OpCodes opcode) PURE; + virtual absl::Status onGetAllChildrenNumberRequest(const absl::StatusOr& path, + const OpCodes opcode) PURE; virtual void onDeleteRequest(const std::string& path, int32_t version) PURE; virtual void onExistsRequest(const std::string& path, bool watch) PURE; virtual void onGetAclRequest(const std::string& path) PURE; virtual void onSetAclRequest(const std::string& path, int32_t version) PURE; - virtual absl::Status onSyncRequest(const absl::StatusOr& path) PURE; + virtual absl::Status onSyncRequest(const absl::StatusOr& path, + const OpCodes opcode) PURE; virtual void onCheckRequest(const std::string& path, int32_t version) PURE; virtual void onMultiRequest() PURE; virtual void onReconfigRequest() PURE; diff --git a/source/extensions/filters/network/zookeeper_proxy/filter.cc b/source/extensions/filters/network/zookeeper_proxy/filter.cc index 817fa14077d4..29e599ba782b 100644 --- a/source/extensions/filters/network/zookeeper_proxy/filter.cc +++ b/source/extensions/filters/network/zookeeper_proxy/filter.cc @@ -24,7 +24,7 @@ ZooKeeperFilterConfig::ZooKeeperFilterConfig( const std::string& stat_prefix, const uint32_t max_packet_bytes, const bool enable_per_opcode_request_bytes_metrics, const bool enable_per_opcode_response_bytes_metrics, - const bool enable_latency_threshold_metrics, + const bool enable_per_opcode_decoder_error_metrics, const bool enable_latency_threshold_metrics, const std::chrono::milliseconds default_latency_threshold, const LatencyThresholdOverrideList& latency_threshold_overrides, Stats::Scope& scope) : scope_(scope), max_packet_bytes_(max_packet_bytes), stats_(generateStats(stat_prefix, scope)), @@ -35,6 +35,7 @@ ZooKeeperFilterConfig::ZooKeeperFilterConfig( unknown_opcode_latency_(stat_name_set_->add("unknown_opcode_latency")), enable_per_opcode_request_bytes_metrics_(enable_per_opcode_request_bytes_metrics), enable_per_opcode_response_bytes_metrics_(enable_per_opcode_response_bytes_metrics), + enable_per_opcode_decoder_error_metrics_(enable_per_opcode_decoder_error_metrics), enable_latency_threshold_metrics_(enable_latency_threshold_metrics), default_latency_threshold_(default_latency_threshold), latency_threshold_override_map_(parseLatencyThresholdOverrides(latency_threshold_overrides)) { @@ -45,79 +46,89 @@ ZooKeeperFilterConfig::ZooKeeperFilterConfig( {"auth_rq", "digest_rq", "host_rq", "ip_rq", "ping_response_rq", "world_rq", "x509_rq"}); initOpCode(OpCodes::Ping, stats_.ping_resp_, stats_.ping_resp_fast_, stats_.ping_resp_slow_, - stats_.ping_rq_bytes_, stats_.ping_resp_bytes_, "ping_response"); + stats_.ping_rq_bytes_, stats_.ping_resp_bytes_, stats_.ping_decoder_error_, + "ping_response"); initOpCode(OpCodes::SetAuth, stats_.auth_resp_, stats_.auth_resp_fast_, stats_.auth_resp_slow_, - stats_.auth_rq_bytes_, stats_.auth_resp_bytes_, "auth_response"); + stats_.auth_rq_bytes_, stats_.auth_resp_bytes_, stats_.auth_decoder_error_, + "auth_response"); initOpCode(OpCodes::GetData, stats_.getdata_resp_, stats_.getdata_resp_fast_, stats_.getdata_resp_slow_, stats_.getdata_rq_bytes_, stats_.getdata_resp_bytes_, - "getdata_resp"); + stats_.getdata_decoder_error_, "getdata_resp"); initOpCode(OpCodes::Create, stats_.create_resp_, stats_.create_resp_fast_, stats_.create_resp_slow_, stats_.create_rq_bytes_, stats_.create_resp_bytes_, - "create_resp"); + stats_.create_decoder_error_, "create_resp"); initOpCode(OpCodes::Create2, stats_.create2_resp_, stats_.create2_resp_fast_, stats_.create2_resp_slow_, stats_.create2_rq_bytes_, stats_.create2_resp_bytes_, - "create2_resp"); + stats_.create2_decoder_error_, "create2_resp"); initOpCode(OpCodes::CreateContainer, stats_.createcontainer_resp_, stats_.createcontainer_resp_fast_, stats_.createcontainer_resp_slow_, stats_.createcontainer_rq_bytes_, stats_.createcontainer_resp_bytes_, - "createcontainer_resp"); + stats_.createcontainer_decoder_error_, "createcontainer_resp"); initOpCode(OpCodes::CreateTtl, stats_.createttl_resp_, stats_.createttl_resp_fast_, stats_.createttl_resp_slow_, stats_.createttl_rq_bytes_, stats_.createttl_resp_bytes_, - "createttl_resp"); + stats_.createttl_decoder_error_, "createttl_resp"); initOpCode(OpCodes::SetData, stats_.setdata_resp_, stats_.setdata_resp_fast_, stats_.setdata_resp_slow_, stats_.setdata_rq_bytes_, stats_.setdata_resp_bytes_, - "setdata_resp"); + stats_.setdata_decoder_error_, "setdata_resp"); initOpCode(OpCodes::GetChildren, stats_.getchildren_resp_, stats_.getchildren_resp_fast_, stats_.getchildren_resp_slow_, stats_.getchildren_rq_bytes_, - stats_.getchildren_resp_bytes_, "getchildren_resp"); + stats_.getchildren_resp_bytes_, stats_.getchildren_decoder_error_, "getchildren_resp"); initOpCode(OpCodes::GetChildren2, stats_.getchildren2_resp_, stats_.getchildren2_resp_fast_, stats_.getchildren2_resp_slow_, stats_.getchildren2_rq_bytes_, - stats_.getchildren2_resp_bytes_, "getchildren2_resp"); + stats_.getchildren2_resp_bytes_, stats_.getchildren2_decoder_error_, + "getchildren2_resp"); initOpCode(OpCodes::Delete, stats_.delete_resp_, stats_.delete_resp_fast_, stats_.delete_resp_slow_, stats_.delete_rq_bytes_, stats_.delete_resp_bytes_, - "delete_resp"); + stats_.delete_decoder_error_, "delete_resp"); initOpCode(OpCodes::Exists, stats_.exists_resp_, stats_.exists_resp_fast_, stats_.exists_resp_slow_, stats_.exists_rq_bytes_, stats_.exists_resp_bytes_, - "exists_resp"); + stats_.exists_decoder_error_, "exists_resp"); initOpCode(OpCodes::GetAcl, stats_.getacl_resp_, stats_.getacl_resp_fast_, stats_.getacl_resp_slow_, stats_.getacl_rq_bytes_, stats_.getacl_resp_bytes_, - "getacl_resp"); + stats_.getacl_decoder_error_, "getacl_resp"); initOpCode(OpCodes::SetAcl, stats_.setacl_resp_, stats_.setacl_resp_fast_, stats_.setacl_resp_slow_, stats_.setacl_rq_bytes_, stats_.setacl_resp_bytes_, - "setacl_resp"); + stats_.setacl_decoder_error_, "setacl_resp"); initOpCode(OpCodes::Sync, stats_.sync_resp_, stats_.sync_resp_fast_, stats_.sync_resp_slow_, - stats_.sync_rq_bytes_, stats_.sync_resp_bytes_, "sync_resp"); + stats_.sync_rq_bytes_, stats_.sync_resp_bytes_, stats_.sync_decoder_error_, + "sync_resp"); initOpCode(OpCodes::Check, stats_.check_resp_, stats_.check_resp_fast_, stats_.check_resp_slow_, - stats_.check_rq_bytes_, stats_.check_resp_bytes_, "check_resp"); + stats_.check_rq_bytes_, stats_.check_resp_bytes_, stats_.check_decoder_error_, + "check_resp"); initOpCode(OpCodes::Multi, stats_.multi_resp_, stats_.multi_resp_fast_, stats_.multi_resp_slow_, - stats_.multi_rq_bytes_, stats_.multi_resp_bytes_, "multi_resp"); + stats_.multi_rq_bytes_, stats_.multi_resp_bytes_, stats_.multi_decoder_error_, + "multi_resp"); initOpCode(OpCodes::Reconfig, stats_.reconfig_resp_, stats_.reconfig_resp_fast_, stats_.reconfig_resp_slow_, stats_.reconfig_rq_bytes_, stats_.reconfig_resp_bytes_, - "reconfig_resp"); + stats_.reconfig_decoder_error_, "reconfig_resp"); initOpCode(OpCodes::SetWatches, stats_.setwatches_resp_, stats_.setwatches_resp_fast_, stats_.setwatches_resp_slow_, stats_.setwatches_rq_bytes_, - stats_.setwatches_resp_bytes_, "setwatches_resp"); + stats_.setwatches_resp_bytes_, stats_.setwatches_decoder_error_, "setwatches_resp"); initOpCode(OpCodes::SetWatches2, stats_.setwatches2_resp_, stats_.setwatches2_resp_fast_, stats_.setwatches2_resp_slow_, stats_.setwatches2_rq_bytes_, - stats_.setwatches2_resp_bytes_, "setwatches2_resp"); + stats_.setwatches2_resp_bytes_, stats_.setwatches2_decoder_error_, "setwatches2_resp"); initOpCode(OpCodes::AddWatch, stats_.addwatch_resp_, stats_.addwatch_resp_fast_, stats_.addwatch_resp_slow_, stats_.addwatch_rq_bytes_, stats_.addwatch_resp_bytes_, - "addwatch_resp"); + stats_.addwatch_decoder_error_, "addwatch_resp"); initOpCode(OpCodes::CheckWatches, stats_.checkwatches_resp_, stats_.checkwatches_resp_fast_, stats_.checkwatches_resp_slow_, stats_.checkwatches_rq_bytes_, - stats_.checkwatches_resp_bytes_, "checkwatches_resp"); + stats_.checkwatches_resp_bytes_, stats_.checkwatches_decoder_error_, + "checkwatches_resp"); initOpCode(OpCodes::RemoveWatches, stats_.removewatches_resp_, stats_.removewatches_resp_fast_, stats_.removewatches_resp_slow_, stats_.removewatches_rq_bytes_, - stats_.removewatches_resp_bytes_, "removewatches_resp"); + stats_.removewatches_resp_bytes_, stats_.removewatches_decoder_error_, + "removewatches_resp"); initOpCode(OpCodes::GetEphemerals, stats_.getephemerals_resp_, stats_.getephemerals_resp_fast_, stats_.getephemerals_resp_slow_, stats_.getephemerals_rq_bytes_, - stats_.getephemerals_resp_bytes_, "getephemerals_resp"); + stats_.getephemerals_resp_bytes_, stats_.getephemerals_decoder_error_, + "getephemerals_resp"); initOpCode(OpCodes::GetAllChildrenNumber, stats_.getallchildrennumber_resp_, stats_.getallchildrennumber_resp_fast_, stats_.getallchildrennumber_resp_slow_, stats_.getallchildrennumber_rq_bytes_, stats_.getallchildrennumber_resp_bytes_, - "getallchildrennumber_resp"); + stats_.getallchildrennumber_decoder_error_, "getallchildrennumber_resp"); initOpCode(OpCodes::Close, stats_.close_resp_, stats_.close_resp_fast_, stats_.close_resp_slow_, - stats_.close_rq_bytes_, stats_.close_resp_bytes_, "close_resp"); + stats_.close_rq_bytes_, stats_.close_resp_bytes_, stats_.close_decoder_error_, + "close_resp"); } ErrorBudgetResponseType @@ -146,13 +157,16 @@ void ZooKeeperFilterConfig::initOpCode(OpCodes opcode, Stats::Counter& resp_coun Stats::Counter& resp_fast_counter, Stats::Counter& resp_slow_counter, Stats::Counter& rq_bytes_counter, - Stats::Counter& resp_bytes_counter, absl::string_view name) { + Stats::Counter& resp_bytes_counter, + Stats::Counter& decoder_error_counter, + absl::string_view name) { OpCodeInfo& opcode_info = op_code_map_[opcode]; opcode_info.resp_counter_ = &resp_counter; opcode_info.resp_fast_counter_ = &resp_fast_counter; opcode_info.resp_slow_counter_ = &resp_slow_counter; opcode_info.rq_bytes_counter_ = &rq_bytes_counter; opcode_info.resp_bytes_counter_ = &resp_bytes_counter; + opcode_info.decoder_error_counter_ = &decoder_error_counter; opcode_info.opname_ = std::string(name); opcode_info.latency_name_ = stat_name_set_->add(absl::StrCat(name, "_latency")); } @@ -239,8 +253,18 @@ void ZooKeeperFilter::onConnect(const bool readonly) { } } -void ZooKeeperFilter::onDecodeError() { +void ZooKeeperFilter::onDecodeError(const absl::optional opcode) { config_->stats_.decoder_error_.inc(); + + if (config_->enable_per_opcode_decoder_error_metrics_ && opcode.has_value() && + config_->op_code_map_.contains(*opcode)) { + if (*opcode == OpCodes::Connect) { + config_->stats_.connect_decoder_error_.inc(); + } else { + config_->op_code_map_[*opcode].decoder_error_counter_->inc(); + } + } + setDynamicMetadata("opname", "error"); } @@ -364,8 +388,12 @@ void ZooKeeperFilter::onSetAclRequest(const std::string& path, const int32_t ver setDynamicMetadata({{"opname", "setacl"}, {"path", path}, {"version", std::to_string(version)}}); } -absl::Status ZooKeeperFilter::onSyncRequest(const absl::StatusOr& path) { - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); +absl::Status ZooKeeperFilter::onSyncRequest(const absl::StatusOr& path, + const OpCodes opcode) { + if (!path.ok()) { + onDecodeError(opcode); + return absl::InvalidArgumentError(path.status().message()); + } config_->stats_.sync_rq_.inc(); setDynamicMetadata({{"opname", "sync"}, {"path", path.value()}}); @@ -412,8 +440,12 @@ void ZooKeeperFilter::onAddWatchRequest(const std::string& path, const int32_t m setDynamicMetadata({{"opname", "addwatch"}, {"path", path}, {"mode", std::to_string(mode)}}); } -absl::Status ZooKeeperFilter::onGetEphemeralsRequest(const absl::StatusOr& path) { - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); +absl::Status ZooKeeperFilter::onGetEphemeralsRequest(const absl::StatusOr& path, + const OpCodes opcode) { + if (!path.ok()) { + onDecodeError(opcode); + return absl::InvalidArgumentError(path.status().message()); + } config_->stats_.getephemerals_rq_.inc(); setDynamicMetadata({{"opname", "getephemerals"}, {"path", path.value()}}); @@ -421,9 +453,12 @@ absl::Status ZooKeeperFilter::onGetEphemeralsRequest(const absl::StatusOr& path) { - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); +absl::Status ZooKeeperFilter::onGetAllChildrenNumberRequest(const absl::StatusOr& path, + const OpCodes opcode) { + if (!path.ok()) { + onDecodeError(opcode); + return absl::InvalidArgumentError(path.status().message()); + } config_->stats_.getallchildrennumber_rq_.inc(); setDynamicMetadata({{"opname", "getallchildrennumber"}, {"path", path.value()}}); diff --git a/source/extensions/filters/network/zookeeper_proxy/filter.h b/source/extensions/filters/network/zookeeper_proxy/filter.h index 94f47506b211..8dfe85e1e91b 100644 --- a/source/extensions/filters/network/zookeeper_proxy/filter.h +++ b/source/extensions/filters/network/zookeeper_proxy/filter.h @@ -29,6 +29,34 @@ namespace ZooKeeperProxy { */ #define ALL_ZOOKEEPER_PROXY_STATS(COUNTER) \ COUNTER(decoder_error) \ + COUNTER(connect_decoder_error) \ + COUNTER(ping_decoder_error) \ + COUNTER(auth_decoder_error) \ + COUNTER(getdata_decoder_error) \ + COUNTER(create_decoder_error) \ + COUNTER(create2_decoder_error) \ + COUNTER(createcontainer_decoder_error) \ + COUNTER(createttl_decoder_error) \ + COUNTER(setdata_decoder_error) \ + COUNTER(getchildren_decoder_error) \ + COUNTER(getchildren2_decoder_error) \ + COUNTER(getephemerals_decoder_error) \ + COUNTER(getallchildrennumber_decoder_error) \ + COUNTER(delete_decoder_error) \ + COUNTER(exists_decoder_error) \ + COUNTER(getacl_decoder_error) \ + COUNTER(setacl_decoder_error) \ + COUNTER(sync_decoder_error) \ + COUNTER(multi_decoder_error) \ + COUNTER(reconfig_decoder_error) \ + COUNTER(close_decoder_error) \ + COUNTER(setauth_decoder_error) \ + COUNTER(setwatches_decoder_error) \ + COUNTER(setwatches2_decoder_error) \ + COUNTER(addwatch_decoder_error) \ + COUNTER(checkwatches_decoder_error) \ + COUNTER(removewatches_decoder_error) \ + COUNTER(check_decoder_error) \ COUNTER(request_bytes) \ COUNTER(connect_rq_bytes) \ COUNTER(connect_readonly_rq_bytes) \ @@ -225,6 +253,7 @@ class ZooKeeperFilterConfig { ZooKeeperFilterConfig(const std::string& stat_prefix, const uint32_t max_packet_bytes, const bool enable_per_opcode_request_bytes_metrics, const bool enable_per_opcode_response_bytes_metrics, + const bool enable_per_opcode_decoder_error_metrics, const bool enable_latency_threshold_metrics, const std::chrono::milliseconds default_latency_threshold, const LatencyThresholdOverrideList& latency_threshold_overrides, @@ -247,6 +276,7 @@ class ZooKeeperFilterConfig { Stats::Counter* resp_slow_counter_; Stats::Counter* rq_bytes_counter_; Stats::Counter* resp_bytes_counter_; + Stats::Counter* decoder_error_counter_; std::string opname_; Stats::StatName latency_name_; }; @@ -263,6 +293,7 @@ class ZooKeeperFilterConfig { const Stats::StatName unknown_opcode_latency_; const bool enable_per_opcode_request_bytes_metrics_; const bool enable_per_opcode_response_bytes_metrics_; + const bool enable_per_opcode_decoder_error_metrics_; ErrorBudgetResponseType errorBudgetDecision(const OpCodes opcode, const std::chrono::milliseconds latency) const; @@ -270,7 +301,8 @@ class ZooKeeperFilterConfig { private: void initOpCode(OpCodes opcode, Stats::Counter& resp_counter, Stats::Counter& resp_fast_counter, Stats::Counter& resp_slow_counter, Stats::Counter& rq_bytes_counter, - Stats::Counter& resp_bytes_counter, absl::string_view name); + Stats::Counter& resp_bytes_counter, Stats::Counter& decoder_error_counter, + absl::string_view name); ZooKeeperProxyStats generateStats(const std::string& prefix, Stats::Scope& scope) { return ZooKeeperProxyStats{ALL_ZOOKEEPER_PROXY_STATS(POOL_COUNTER_PREFIX(scope, prefix))}; @@ -336,7 +368,7 @@ class ZooKeeperFilter : public Network::Filter, Network::FilterStatus onWrite(Buffer::Instance& data, bool end_stream) override; // ZooKeeperProxy::DecoderCallback - void onDecodeError() override; + void onDecodeError(const absl::optional opcode) override; void onRequestBytes(const absl::optional opcode, const uint64_t bytes) override; void onConnect(bool readonly) override; void onPing() override; @@ -349,7 +381,8 @@ class ZooKeeperFilter : public Network::Filter, void onExistsRequest(const std::string& path, bool watch) override; void onGetAclRequest(const std::string& path) override; void onSetAclRequest(const std::string& path, int32_t version) override; - absl::Status onSyncRequest(const absl::StatusOr& path) override; + absl::Status onSyncRequest(const absl::StatusOr& path, + const OpCodes opcode) override; void onCheckRequest(const std::string& path, int32_t version) override; void onMultiRequest() override; void onReconfigRequest() override; @@ -358,8 +391,10 @@ class ZooKeeperFilter : public Network::Filter, void onAddWatchRequest(const std::string& path, const int32_t mode) override; void onCheckWatchesRequest(const std::string& path, int32_t type) override; void onRemoveWatchesRequest(const std::string& path, int32_t type) override; - absl::Status onGetEphemeralsRequest(const absl::StatusOr& path) override; - absl::Status onGetAllChildrenNumberRequest(const absl::StatusOr& path) override; + absl::Status onGetEphemeralsRequest(const absl::StatusOr& path, + const OpCodes opcode) override; + absl::Status onGetAllChildrenNumberRequest(const absl::StatusOr& path, + const OpCodes opcode) override; void onCloseRequest() override; void onResponseBytes(const absl::optional opcode, const uint64_t bytes) override; void onConnectResponse(int32_t proto_version, int32_t timeout, bool readonly, diff --git a/source/extensions/filters/network/zookeeper_proxy/utils.h b/source/extensions/filters/network/zookeeper_proxy/utils.h index 76b43661dcfb..59dd5292f114 100644 --- a/source/extensions/filters/network/zookeeper_proxy/utils.h +++ b/source/extensions/filters/network/zookeeper_proxy/utils.h @@ -48,9 +48,9 @@ class BufferHelper : public Logger::Loggable { return status; \ } -#define EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status) \ +#define EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode) \ if (!status.ok()) { \ - callbacks_.onDecodeError(); \ + callbacks_.onDecodeError(opcode); \ return status; \ } @@ -59,9 +59,9 @@ class BufferHelper : public Logger::Loggable { return absl::InvalidArgumentError(message); \ } -#define EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, message) \ +#define EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, opcode, message) \ if (!status.ok()) { \ - callbacks_.onDecodeError(); \ + callbacks_.onDecodeError(opcode); \ return absl::InvalidArgumentError(message); \ } } // namespace ZooKeeperProxy diff --git a/test/extensions/filters/network/zookeeper_proxy/config_test.cc b/test/extensions/filters/network/zookeeper_proxy/config_test.cc index 30043aaa11c8..f20621bd3afe 100644 --- a/test/extensions/filters/network/zookeeper_proxy/config_test.cc +++ b/test/extensions/filters/network/zookeeper_proxy/config_test.cc @@ -26,6 +26,7 @@ stat_prefix: test_prefix max_packet_bytes: 1048576 enable_per_opcode_request_bytes_metrics: true enable_per_opcode_response_bytes_metrics: true +enable_per_opcode_decoder_error_metrics: true enable_latency_threshold_metrics: true default_latency_threshold: "0.1s" latency_threshold_overrides:)EOF"; @@ -183,6 +184,7 @@ stat_prefix: test_prefix EXPECT_EQ(proto_config_.max_packet_bytes().value(), 0); EXPECT_EQ(proto_config_.enable_per_opcode_request_bytes_metrics(), false); EXPECT_EQ(proto_config_.enable_per_opcode_response_bytes_metrics(), false); + EXPECT_EQ(proto_config_.enable_per_opcode_decoder_error_metrics(), false); EXPECT_EQ(proto_config_.enable_latency_threshold_metrics(), false); EXPECT_EQ(proto_config_.default_latency_threshold(), ProtobufWkt::util::TimeUtil::SecondsToDuration(0)); @@ -204,6 +206,7 @@ default_latency_threshold: "0.15s" EXPECT_EQ(proto_config_.max_packet_bytes().value(), 0); EXPECT_EQ(proto_config_.enable_per_opcode_request_bytes_metrics(), false); EXPECT_EQ(proto_config_.enable_per_opcode_response_bytes_metrics(), false); + EXPECT_EQ(proto_config_.enable_per_opcode_decoder_error_metrics(), false); EXPECT_EQ(proto_config_.enable_latency_threshold_metrics(), false); EXPECT_EQ(proto_config_.default_latency_threshold(), ProtobufWkt::util::TimeUtil::MillisecondsToDuration(150)); @@ -227,6 +230,7 @@ stat_prefix: test_prefix EXPECT_EQ(proto_config_.max_packet_bytes().value(), 0); EXPECT_EQ(proto_config_.enable_per_opcode_request_bytes_metrics(), false); EXPECT_EQ(proto_config_.enable_per_opcode_response_bytes_metrics(), false); + EXPECT_EQ(proto_config_.enable_per_opcode_decoder_error_metrics(), false); EXPECT_EQ(proto_config_.enable_latency_threshold_metrics(), false); EXPECT_EQ(proto_config_.default_latency_threshold(), ProtobufWkt::util::TimeUtil::SecondsToDuration(0)); @@ -251,6 +255,7 @@ TEST_F(ZookeeperFilterConfigTest, FullConfig) { EXPECT_EQ(proto_config_.max_packet_bytes().value(), 1048576); EXPECT_EQ(proto_config_.enable_per_opcode_request_bytes_metrics(), true); EXPECT_EQ(proto_config_.enable_per_opcode_response_bytes_metrics(), true); + EXPECT_EQ(proto_config_.enable_per_opcode_decoder_error_metrics(), true); EXPECT_EQ(proto_config_.enable_latency_threshold_metrics(), true); EXPECT_EQ(proto_config_.default_latency_threshold(), ProtobufWkt::util::TimeUtil::MillisecondsToDuration(100)); diff --git a/test/extensions/filters/network/zookeeper_proxy/filter_test.cc b/test/extensions/filters/network/zookeeper_proxy/filter_test.cc index f22f9f14cee4..72f8bba27954 100644 --- a/test/extensions/filters/network/zookeeper_proxy/filter_test.cc +++ b/test/extensions/filters/network/zookeeper_proxy/filter_test.cc @@ -31,14 +31,16 @@ class ZooKeeperFilterTest : public testing::Test { void initialize( const bool enable_per_opcode_request_bytes_metrics = true, const bool enable_per_opcode_response_bytes_metrics = true, + const bool enable_per_opcode_decoder_error_metrics = true, const bool enable_latency_threshold_metrics = true, const std::chrono::milliseconds default_latency_threshold = std::chrono::milliseconds(100), const LatencyThresholdOverrideList& latency_threshold_overrides = LatencyThresholdOverrideList()) { config_ = std::make_shared( stat_prefix_, 1048576, enable_per_opcode_request_bytes_metrics, - enable_per_opcode_response_bytes_metrics, enable_latency_threshold_metrics, - default_latency_threshold, latency_threshold_overrides, scope_); + enable_per_opcode_response_bytes_metrics, enable_per_opcode_decoder_error_metrics, + enable_latency_threshold_metrics, default_latency_threshold, latency_threshold_overrides, + scope_); filter_ = std::make_unique(config_, time_system_); filter_->initializeReadFilterCallbacks(filter_callbacks_); } @@ -349,6 +351,24 @@ class ZooKeeperFilterTest : public testing::Test { return buffer; } + Buffer::OwnedImpl + encodeTooSmallCreateRequest(const std::string& path, const std::string& data, + const bool txn = false, const int32_t xid = 1000, + const int32_t opcode = enumToSignedInt(OpCodes::Create)) const { + Buffer::OwnedImpl buffer; + + if (!txn) { + buffer.writeBEInt(16 + path.length() + data.length()); + buffer.writeBEInt(xid); + buffer.writeBEInt(opcode); + } + + addString(buffer, path); + addString(buffer, data); + // Deliberately not adding acls and flags to the buffer and change the length accordingly. + return buffer; + } + Buffer::OwnedImpl encodeSetRequest(const std::string& path, const std::string& data, const int32_t version, const bool txn = false) const { Buffer::OwnedImpl buffer; @@ -556,6 +576,9 @@ class ZooKeeperFilterTest : public testing::Test { case OpCodes::CreateTtl: opname = "createttl"; break; + case OpCodes::Create2: + opname = "create2"; + break; default: break; } @@ -565,26 +588,12 @@ class ZooKeeperFilterTest : public testing::Test { {{"bytes", "35"}}}); EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); - - switch (opcode) { - case OpCodes::Create: - EXPECT_EQ(1UL, config_->stats().create_rq_.value()); - EXPECT_EQ(35UL, store_.counter("test.zookeeper.create_rq_bytes").value()); - break; - case OpCodes::CreateContainer: - EXPECT_EQ(1UL, config_->stats().createcontainer_rq_.value()); - EXPECT_EQ(35UL, store_.counter("test.zookeeper.createcontainer_rq_bytes").value()); - break; - case OpCodes::CreateTtl: - EXPECT_EQ(1UL, config_->stats().createttl_rq_.value()); - EXPECT_EQ(35UL, store_.counter("test.zookeeper.createttl_rq_bytes").value()); - break; - default: - break; - } - + EXPECT_EQ(1UL, store_.counter(absl::StrCat("test.zookeeper.", opname, "_rq")).value()); EXPECT_EQ(35UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(35UL, store_.counter(absl::StrCat("test.zookeeper.", opname, "_rq_bytes")).value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, + store_.counter(absl::StrCat("test.zookeeper.", opname, "_decoder_error")).value()); } void testCreateWithNegativeDataLen(CreateFlags flag, const int32_t flag_val, @@ -601,6 +610,9 @@ class ZooKeeperFilterTest : public testing::Test { case OpCodes::CreateTtl: opname = "createttl"; break; + case OpCodes::Create2: + opname = "create2"; + break; default: break; } @@ -610,24 +622,12 @@ class ZooKeeperFilterTest : public testing::Test { {{"bytes", "32"}}}); EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); - - switch (opcode) { - case OpCodes::Create: - EXPECT_EQ(1UL, config_->stats().create_rq_.value()); - break; - case OpCodes::CreateContainer: - EXPECT_EQ(1UL, config_->stats().createcontainer_rq_.value()); - break; - case OpCodes::CreateTtl: - EXPECT_EQ(1UL, config_->stats().createttl_rq_.value()); - break; - default: - break; - } - + EXPECT_EQ(1UL, store_.counter(absl::StrCat("test.zookeeper.", opname, "_rq")).value()); EXPECT_EQ(32UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(32UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(32UL, store_.counter(absl::StrCat("test.zookeeper.", opname, "_rq_bytes")).value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, + store_.counter(absl::StrCat("test.zookeeper.", opname, "_decoder_error")).value()); } void testRequest(Buffer::OwnedImpl& data, const std::vector& metadata_values) { @@ -653,6 +653,8 @@ class ZooKeeperFilterTest : public testing::Test { EXPECT_EQ(request_bytes, store_.counter(absl::StrCat("test.zookeeper.", opcode, "_rq_bytes")).value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, + store_.counter(absl::StrCat("test.zookeeper.", opcode, "_decoder_error")).value()); } void testControlRequest(Buffer::OwnedImpl& data, const std::vector& metadata_values, @@ -681,11 +683,11 @@ class ZooKeeperFilterTest : public testing::Test { expectSetDynamicMetadata(metadata_values); Buffer::OwnedImpl data = encodeResponseHeader(xid, zxid, 0); - std::string resp = ""; + std::string response = ""; for (const auto& metadata : metadata_values) { auto it = metadata.find("opname"); if (it != metadata.end()) { - resp = it->second; + response = it->second; } } @@ -693,11 +695,19 @@ class ZooKeeperFilterTest : public testing::Test { // However, its corresponding metric names have `_resp` suffix. std::string long_resp_suffix = "_response"; std::string short_resp_suffix = "_resp"; - size_t pos = resp.rfind(long_resp_suffix); + std::string resp = response; + size_t pos = response.rfind(long_resp_suffix); if (pos != std::string::npos) { resp.replace(pos, long_resp_suffix.length(), short_resp_suffix); } + // Fetch opcode by trimming the `_resp` suffix. + std::string opcode = resp; + pos = opcode.rfind(short_resp_suffix); + if (pos != std::string::npos) { + opcode.replace(pos, short_resp_suffix.length(), ""); + } + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onWrite(data, false)); EXPECT_EQ(1UL * response_count, store_.counter(absl::StrCat("test.zookeeper.", resp)).value()); EXPECT_EQ(1UL * response_count, @@ -707,8 +717,9 @@ class ZooKeeperFilterTest : public testing::Test { EXPECT_EQ(20UL * response_count, store_.counter(absl::StrCat("test.zookeeper.", resp, "_bytes")).value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); - const auto histogram_name = - fmt::format("test.zookeeper.{}_latency", metadata_values[0].find("opname")->second); + EXPECT_EQ(0UL, + store_.counter(absl::StrCat("test.zookeeper.", opcode, "_decoder_error")).value()); + const auto histogram_name = fmt::format("test.zookeeper.{}_latency", response); EXPECT_NE(absl::nullopt, findHistogram(histogram_name)); } @@ -731,7 +742,7 @@ TEST_F(ZooKeeperFilterTest, DisableErrorBudgetCalculation) { std::chrono::milliseconds default_latency_threshold(200); LatencyThresholdOverrideList latency_threshold_overrides; - initialize(true, true, false, default_latency_threshold, latency_threshold_overrides); + initialize(true, true, true, false, default_latency_threshold, latency_threshold_overrides); EXPECT_EQ(config_->errorBudgetDecision(OpCodes::Connect, std::chrono::milliseconds(50)), ErrorBudgetResponseType::None); @@ -754,7 +765,7 @@ TEST_F(ZooKeeperFilterTest, ErrorBudgetDecisionWithDefaultLatencyThresholdConfig std::chrono::milliseconds default_latency_threshold(200); LatencyThresholdOverrideList latency_threshold_overrides; - initialize(true, true, true, default_latency_threshold, latency_threshold_overrides); + initialize(true, true, true, true, default_latency_threshold, latency_threshold_overrides); EXPECT_EQ(config_->errorBudgetDecision(OpCodes::Connect, std::chrono::milliseconds(50)), ErrorBudgetResponseType::Fast); @@ -781,7 +792,7 @@ TEST_F(ZooKeeperFilterTest, ErrorBudgetDecisionWithMultiLatencyThresholdConfig) threshold_override->set_opcode(LatencyThresholdOverride::Multi); threshold_override->mutable_threshold()->set_nanos(200000000); // 200 milliseconds - initialize(true, true, true, default_latency_threshold, latency_threshold_overrides); + initialize(true, true, true, true, default_latency_threshold, latency_threshold_overrides); EXPECT_EQ(config_->errorBudgetDecision(OpCodes::Connect, std::chrono::milliseconds(50)), ErrorBudgetResponseType::Fast); @@ -811,7 +822,7 @@ TEST_F(ZooKeeperFilterTest, ErrorBudgetDecisionWithDefaultAndOtherLatencyThresho threshold_override->set_opcode(LatencyThresholdOverride::Create); threshold_override->mutable_threshold()->set_nanos(200000000); // 200 milliseconds - initialize(true, true, true, default_latency_threshold, latency_threshold_overrides); + initialize(true, true, true, true, default_latency_threshold, latency_threshold_overrides); EXPECT_EQ(config_->errorBudgetDecision(OpCodes::Connect, std::chrono::milliseconds(150)), ErrorBudgetResponseType::Fast); @@ -840,16 +851,17 @@ TEST_F(ZooKeeperFilterTest, DisablePerOpcodeRequestAndResponseBytesMetrics) { std::chrono::milliseconds default_latency_threshold(100); LatencyThresholdOverrideList latency_threshold_overrides; - initialize(false, false, true, default_latency_threshold, latency_threshold_overrides); + initialize(false, false, true, true, default_latency_threshold, latency_threshold_overrides); Buffer::OwnedImpl data = encodeConnect(); expectSetDynamicMetadata({{{"opname", "connect"}}, {{"bytes", "32"}}}); EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); - EXPECT_EQ(1UL, store_.counter("test.zookeeper.connect_rq").value()); + EXPECT_EQ(1UL, config_->stats().connect_rq_.value()); EXPECT_EQ(32UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.connect_rq_bytes").value()); + EXPECT_EQ(0UL, config_->stats().connect_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().connect_decoder_error_.value()); data = encodeConnectResponse(); expectSetDynamicMetadata({{{"opname", "connect_response"}, @@ -862,8 +874,9 @@ TEST_F(ZooKeeperFilterTest, DisablePerOpcodeRequestAndResponseBytesMetrics) { EXPECT_EQ(1UL, config_->stats().connect_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().connect_resp_slow_.value()); EXPECT_EQ(24UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.connect_resp_bytes").value()); + EXPECT_EQ(0UL, config_->stats().connect_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().connect_decoder_error_.value()); EXPECT_NE(absl::nullopt, findHistogram("test.zookeeper.connect_response_latency")); } @@ -871,16 +884,17 @@ TEST_F(ZooKeeperFilterTest, DisablePerOpcodeRequestBytesMetrics) { std::chrono::milliseconds default_latency_threshold(100); LatencyThresholdOverrideList latency_threshold_overrides; - initialize(false, true, true, default_latency_threshold, latency_threshold_overrides); + initialize(false, true, true, true, default_latency_threshold, latency_threshold_overrides); Buffer::OwnedImpl data = encodeConnect(); expectSetDynamicMetadata({{{"opname", "connect"}}, {{"bytes", "32"}}}); EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); - EXPECT_EQ(1UL, store_.counter("test.zookeeper.connect_rq").value()); + EXPECT_EQ(1UL, config_->stats().connect_rq_.value()); EXPECT_EQ(32UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.connect_rq_bytes").value()); + EXPECT_EQ(0UL, config_->stats().connect_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().connect_decoder_error_.value()); data = encodeConnectResponse(); expectSetDynamicMetadata({{{"opname", "connect_response"}, @@ -893,8 +907,9 @@ TEST_F(ZooKeeperFilterTest, DisablePerOpcodeRequestBytesMetrics) { EXPECT_EQ(1UL, config_->stats().connect_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().connect_resp_slow_.value()); EXPECT_EQ(24UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(24UL, store_.counter("test.zookeeper.connect_resp_bytes").value()); + EXPECT_EQ(24UL, config_->stats().connect_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().connect_decoder_error_.value()); EXPECT_NE(absl::nullopt, findHistogram("test.zookeeper.connect_response_latency")); } @@ -902,16 +917,17 @@ TEST_F(ZooKeeperFilterTest, DisablePerOpcodeResponseBytesMetrics) { std::chrono::milliseconds default_latency_threshold(100); LatencyThresholdOverrideList latency_threshold_overrides; - initialize(true, false, true, default_latency_threshold, latency_threshold_overrides); + initialize(true, false, true, true, default_latency_threshold, latency_threshold_overrides); Buffer::OwnedImpl data = encodeConnect(); expectSetDynamicMetadata({{{"opname", "connect"}}, {{"bytes", "32"}}}); EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); - EXPECT_EQ(1UL, store_.counter("test.zookeeper.connect_rq").value()); + EXPECT_EQ(1UL, config_->stats().connect_rq_.value()); EXPECT_EQ(32UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(32UL, store_.counter("test.zookeeper.connect_rq_bytes").value()); + EXPECT_EQ(32UL, config_->stats().connect_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().connect_decoder_error_.value()); data = encodeConnectResponse(); expectSetDynamicMetadata({{{"opname", "connect_response"}, @@ -924,8 +940,9 @@ TEST_F(ZooKeeperFilterTest, DisablePerOpcodeResponseBytesMetrics) { EXPECT_EQ(1UL, config_->stats().connect_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().connect_resp_slow_.value()); EXPECT_EQ(24UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.connect_resp_bytes").value()); + EXPECT_EQ(0UL, config_->stats().connect_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().connect_decoder_error_.value()); EXPECT_NE(absl::nullopt, findHistogram("test.zookeeper.connect_response_latency")); } @@ -947,8 +964,9 @@ TEST_F(ZooKeeperFilterTest, Connect) { EXPECT_EQ(1UL, config_->stats().connect_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().connect_resp_slow_.value()); EXPECT_EQ(24UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(24UL, store_.counter("test.zookeeper.connect_resp_bytes").value()); + EXPECT_EQ(24UL, config_->stats().connect_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().connect_decoder_error_.value()); EXPECT_NE(absl::nullopt, findHistogram("test.zookeeper.connect_response_latency")); } @@ -971,8 +989,9 @@ TEST_F(ZooKeeperFilterTest, ConnectReadonly) { EXPECT_EQ(1UL, config_->stats().connect_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().connect_resp_slow_.value()); EXPECT_EQ(25UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(25UL, store_.counter("test.zookeeper.connect_resp_bytes").value()); + EXPECT_EQ(25UL, config_->stats().connect_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().connect_decoder_error_.value()); EXPECT_NE(absl::nullopt, findHistogram("test.zookeeper.connect_response_latency")); } @@ -1108,14 +1127,39 @@ TEST_F(ZooKeeperFilterTest, CreateRequestTTLSequential) { } TEST_F(ZooKeeperFilterTest, CreateRequest2) { + testCreate(CreateFlags::Persistent, 0, OpCodes::Create2); + testResponse({{{"opname", "create2_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}); +} + +TEST_F(ZooKeeperFilterTest, TooSmallCreateRequest) { initialize(); Buffer::OwnedImpl data = - encodeCreateRequest("/foo", "bar", 0, false, 1000, enumToSignedInt(OpCodes::Create2)); + encodeTooSmallCreateRequest("/foo", "bar", false, 1000, enumToSignedInt(OpCodes::Create)); - testRequest(data, {{{"opname", "create2"}, {"path", "/foo"}, {"create_type", "persistent"}}, - {{"bytes", "35"}}}); - testResponse({{{"opname", "create2_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}); + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(0UL, config_->stats().create_rq_.value()); + EXPECT_EQ(0UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().create_rq_bytes_.value()); + EXPECT_EQ(1UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(1UL, config_->stats().create_decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, TooSmallCreateRequestWithDisabledPerOpcodeDecoderErrorMetrics) { + std::chrono::milliseconds default_latency_threshold(100); + LatencyThresholdOverrideList latency_threshold_overrides; + + initialize(true, true, false, true, default_latency_threshold, latency_threshold_overrides); + + Buffer::OwnedImpl data = + encodeTooSmallCreateRequest("/foo", "bar", false, 1000, enumToSignedInt(OpCodes::Create)); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(0UL, config_->stats().create_rq_.value()); + EXPECT_EQ(0UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().create_rq_bytes_.value()); + EXPECT_EQ(1UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); } TEST_F(ZooKeeperFilterTest, SetRequest) { @@ -1229,8 +1273,9 @@ TEST_F(ZooKeeperFilterTest, CheckRequest) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(1UL, config_->stats().check_rq_.value()); EXPECT_EQ(24UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(24UL, store_.counter("test.zookeeper.check_rq_bytes").value()); + EXPECT_EQ(24UL, config_->stats().check_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().check_decoder_error_.value()); testResponse({{{"opname", "check_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}); } @@ -1260,12 +1305,13 @@ TEST_F(ZooKeeperFilterTest, MultiRequest) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(1UL, config_->stats().multi_rq_.value()); EXPECT_EQ(200UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(200UL, store_.counter("test.zookeeper.multi_rq_bytes").value()); + EXPECT_EQ(200UL, config_->stats().multi_rq_bytes_.value()); EXPECT_EQ(3UL, config_->stats().create_rq_.value()); EXPECT_EQ(1UL, config_->stats().setdata_rq_.value()); EXPECT_EQ(1UL, config_->stats().check_rq_.value()); EXPECT_EQ(2UL, config_->stats().delete_rq_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().multi_decoder_error_.value()); testResponse({{{"opname", "multi_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}); } @@ -1386,7 +1432,7 @@ TEST_F(ZooKeeperFilterTest, WatchEvent) { // WATCH_XID is generated by the server, it has no corresponding opcode. // Below expectation makes sure that WATCH_XID does not return the default opcode (which is // connect). - EXPECT_EQ(0UL, store_.counter("test.zookeeper.connect_resp_bytes").value()); + EXPECT_EQ(0UL, config_->stats().connect_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); } @@ -1415,8 +1461,9 @@ TEST_F(ZooKeeperFilterTest, OneRequestWithMultipleOnDataCalls) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(0UL, config_->stats().create_rq_.value()); EXPECT_EQ(0UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(0UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. data.drain(data.length()); @@ -1430,8 +1477,9 @@ TEST_F(ZooKeeperFilterTest, OneRequestWithMultipleOnDataCalls) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(1UL, config_->stats().create_rq_.value()); EXPECT_EQ(35UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(35UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(35UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Response. testResponse({{{"opname", "create_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}); @@ -1450,8 +1498,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithOneOnDataCall) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(2UL, config_->stats().create_rq_.value()); EXPECT_EQ(71UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(71UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(71UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Responses. testResponse({{{"opname", "create_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}); @@ -1471,8 +1520,9 @@ TEST_F(ZooKeeperFilterTest, MultipleControlRequestsWithOneOnDataCall) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(2UL, store_.counter("test.zookeeper.auth.digest_rq").value()); EXPECT_EQ(72UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(72UL, store_.counter("test.zookeeper.auth_rq_bytes").value()); + EXPECT_EQ(72UL, config_->stats().auth_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().auth_decoder_error_.value()); // Responses. testResponse({{{"opname", "auth_response"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}, @@ -1494,9 +1544,11 @@ TEST_F(ZooKeeperFilterTest, MixedControlAndDataRequestsWithOneOnDataCall) { EXPECT_EQ(1UL, store_.counter("test.zookeeper.auth.digest_rq").value()); EXPECT_EQ(1UL, config_->stats().create_rq_.value()); EXPECT_EQ(71UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(36UL, store_.counter("test.zookeeper.auth_rq_bytes").value()); - EXPECT_EQ(35UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(36UL, config_->stats().auth_rq_bytes_.value()); + EXPECT_EQ(35UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().auth_decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Responses. testResponse({{{"opname", "auth_response"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}, @@ -1510,9 +1562,11 @@ TEST_F(ZooKeeperFilterTest, MixedControlAndDataRequestsWithOneOnDataCall) { EXPECT_EQ(1UL, config_->stats().create_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().create_resp_slow_.value()); EXPECT_EQ(40UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(20UL, store_.counter("test.zookeeper.auth_resp_bytes").value()); - EXPECT_EQ(20UL, store_.counter("test.zookeeper.create_resp_bytes").value()); + EXPECT_EQ(20UL, config_->stats().auth_resp_bytes_.value()); + EXPECT_EQ(20UL, config_->stats().create_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().auth_decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); EXPECT_NE(absl::nullopt, findHistogram("test.zookeeper.create_resp_latency")); } @@ -1528,8 +1582,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(0UL, config_->stats().create_rq_.value()); EXPECT_EQ(0UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(0UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. data.drain(data.length()); @@ -1547,8 +1602,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(1UL, config_->stats().create_rq_.value()); EXPECT_EQ(35UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(35UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(35UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. data.drain(data.length()); @@ -1564,8 +1620,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(2UL, config_->stats().create_rq_.value()); EXPECT_EQ(71UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(71UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(71UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Responses. testResponse({{{"opname", "create_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}, @@ -1586,8 +1643,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls2) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(0UL, config_->stats().create_rq_.value()); EXPECT_EQ(0UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(0UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. data.drain(data.length()); @@ -1604,8 +1662,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls2) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(3UL, config_->stats().create_rq_.value()); EXPECT_EQ(108UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(108UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(108UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Responses. testResponse({{{"opname", "create_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}, @@ -1631,8 +1690,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls3) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(2UL, config_->stats().create_rq_.value()); EXPECT_EQ(71UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(71UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(71UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. data.drain(data.length()); @@ -1646,8 +1706,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls3) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(3UL, config_->stats().create_rq_.value()); EXPECT_EQ(108UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(108UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(108UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Responses. testResponse({{{"opname", "create_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}, @@ -1673,8 +1734,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls4) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(1UL, config_->stats().create_rq_.value()); EXPECT_EQ(35UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(35UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(35UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. data.drain(data.length()); @@ -1686,8 +1748,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls4) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(1UL, config_->stats().create_rq_.value()); EXPECT_EQ(35UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(35UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(35UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. data.drain(data.length()); @@ -1701,8 +1764,9 @@ TEST_F(ZooKeeperFilterTest, MultipleRequestsWithMultipleOnDataCalls4) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); EXPECT_EQ(3UL, config_->stats().create_rq_.value()); EXPECT_EQ(108UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(108UL, store_.counter("test.zookeeper.create_rq_bytes").value()); + EXPECT_EQ(108UL, config_->stats().create_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().create_decoder_error_.value()); // Responses. testResponse({{{"opname", "create_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}}, @@ -1724,8 +1788,9 @@ TEST_F(ZooKeeperFilterTest, OneResponseWithMultipleOnWriteCalls) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(rq_data, false)); EXPECT_EQ(1UL, config_->stats().getdata_rq_.value()); EXPECT_EQ(21UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(21UL, store_.counter("test.zookeeper.getdata_rq_bytes").value()); + EXPECT_EQ(21UL, config_->stats().getdata_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Response (onWrite1). Buffer::OwnedImpl resp_data = encodeResponseWithPartialData(1000, 2000, 0); @@ -1735,8 +1800,9 @@ TEST_F(ZooKeeperFilterTest, OneResponseWithMultipleOnWriteCalls) { EXPECT_EQ(0UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(0UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(0UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. resp_data.drain(resp_data.length()); @@ -1749,8 +1815,9 @@ TEST_F(ZooKeeperFilterTest, OneResponseWithMultipleOnWriteCalls) { EXPECT_EQ(1UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(24UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(24UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(24UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); } // |RESP1|RESP2| @@ -1765,8 +1832,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithOneOnWriteCall) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(rq_data, false)); EXPECT_EQ(2UL, config_->stats().getdata_rq_.value()); EXPECT_EQ(42UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(42UL, store_.counter("test.zookeeper.getdata_rq_bytes").value()); + EXPECT_EQ(42UL, config_->stats().getdata_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Response (onWrite1). Buffer::OwnedImpl resp_data = encodeResponse(1000, 2000, 0, "/foo"); @@ -1777,8 +1845,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithOneOnWriteCall) { EXPECT_EQ(2UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(48UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(48UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(48UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); } // |RESP1 --------|RESP2 ------------| @@ -1794,8 +1863,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(rq_data, false)); EXPECT_EQ(2UL, config_->stats().getdata_rq_.value()); EXPECT_EQ(42UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(42UL, store_.counter("test.zookeeper.getdata_rq_bytes").value()); + EXPECT_EQ(42UL, config_->stats().getdata_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Response (onWrite1). Buffer::OwnedImpl resp_data = encodeResponseWithPartialData(1000, 2000, 0); @@ -1805,8 +1875,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls) { EXPECT_EQ(0UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(0UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(0UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. resp_data.drain(resp_data.length()); @@ -1823,8 +1894,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls) { EXPECT_EQ(1UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(24UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(24UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(24UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. resp_data.drain(resp_data.length()); @@ -1838,8 +1910,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls) { EXPECT_EQ(2UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(50UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(50UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(50UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); } // |RESP1 ------|RESP2|RESP3| @@ -1857,8 +1930,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls2) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(rq_data, false)); EXPECT_EQ(3UL, config_->stats().getdata_rq_.value()); EXPECT_EQ(63UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(63UL, store_.counter("test.zookeeper.getdata_rq_bytes").value()); + EXPECT_EQ(63UL, config_->stats().getdata_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Response (onWrite1). Buffer::OwnedImpl resp_data = encodeResponseWithPartialData(1000, 2000, 0); @@ -1868,8 +1942,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls2) { EXPECT_EQ(0UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(0UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(0UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(0UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. resp_data.drain(resp_data.length()); @@ -1885,8 +1960,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls2) { EXPECT_EQ(3UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(72UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(72UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(72UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); } // |RESP1|RESP2|RESP3 ---------| @@ -1904,8 +1980,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls3) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(rq_data, false)); EXPECT_EQ(3UL, config_->stats().getdata_rq_.value()); EXPECT_EQ(63UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(63UL, store_.counter("test.zookeeper.getdata_rq_bytes").value()); + EXPECT_EQ(63UL, config_->stats().getdata_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Response (onWrite1). Buffer::OwnedImpl resp_data = encodeResponse(1000, 2000, 0, "abcd"); @@ -1917,8 +1994,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls3) { EXPECT_EQ(2UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(48UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(48UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(48UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. resp_data.drain(resp_data.length()); @@ -1931,8 +2009,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls3) { EXPECT_EQ(3UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(72UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(72UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(72UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); } // |RESP1|RESP2 ------------------|RESP3| @@ -1950,8 +2029,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls4) { EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(rq_data, false)); EXPECT_EQ(3UL, config_->stats().getdata_rq_.value()); EXPECT_EQ(63UL, config_->stats().request_bytes_.value()); - EXPECT_EQ(63UL, store_.counter("test.zookeeper.getdata_rq_bytes").value()); + EXPECT_EQ(63UL, config_->stats().getdata_rq_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Response (onWrite1). Buffer::OwnedImpl resp_data = encodeResponse(1000, 2000, 0, "abcd"); @@ -1962,8 +2042,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls4) { EXPECT_EQ(1UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(24UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(24UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(24UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. resp_data.drain(resp_data.length()); @@ -1976,8 +2057,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls4) { EXPECT_EQ(1UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(24UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(24UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(24UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); // Mock the buffer is drained by the tcp_proxy filter. resp_data.drain(resp_data.length()); @@ -1992,8 +2074,9 @@ TEST_F(ZooKeeperFilterTest, MultipleResponsesWithMultipleOnWriteCalls4) { EXPECT_EQ(3UL, config_->stats().getdata_resp_fast_.value()); EXPECT_EQ(0UL, config_->stats().getdata_resp_slow_.value()); EXPECT_EQ(72UL, config_->stats().response_bytes_.value()); - EXPECT_EQ(72UL, store_.counter("test.zookeeper.getdata_resp_bytes").value()); + EXPECT_EQ(72UL, config_->stats().getdata_resp_bytes_.value()); EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + EXPECT_EQ(0UL, config_->stats().getdata_decoder_error_.value()); } } // namespace ZooKeeperProxy