diff --git a/source/extensions/filters/network/zookeeper_proxy/decoder.cc b/source/extensions/filters/network/zookeeper_proxy/decoder.cc index 92367e55063a..968dc798b321 100644 --- a/source/extensions/filters/network/zookeeper_proxy/decoder.cc +++ b/source/extensions/filters/network/zookeeper_proxy/decoder.cc @@ -42,26 +42,15 @@ const char* createFlagsToString(CreateFlags flags) { return "unknown"; } -absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instance& data, - uint64_t& offset) { +absl::optional DecoderImpl::decodeOnData(Buffer::Instance& data, uint64_t& offset) { ENVOY_LOG(trace, "zookeeper_proxy: decoding request with {} bytes at offset {}", data.length(), offset); // Check message length. - const absl::StatusOr len = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - len, fmt::format("peekInt32 for len: {}", len.status().message())); - - ENVOY_LOG(trace, "zookeeper_proxy: decoding request with len {} at offset {}", len.value(), - offset); - - absl::Status status = ensureMinLength(len.value(), XID_LENGTH + INT_LENGTH); // xid + opcode - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("ensureMinLength: {}", status.message())); - - status = ensureMaxLength(len.value()); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("ensureMaxLength: {}", status.message())); + const int32_t len = helper_.peekInt32(data, offset); + ENVOY_LOG(trace, "zookeeper_proxy: decoding request with len {} at offset {}", len, offset); + ensureMinLength(len, XID_LENGTH + INT_LENGTH); // xid + opcode + ensureMaxLength(len); auto start_time = time_source_.monotonicTime(); @@ -75,40 +64,26 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan // ZooKeeper server to the next. Thus, the special xid. // However, some client implementations might expose setWatches // as a regular data request, so we support that as well. - const absl::StatusOr xid = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - xid, fmt::format("peerInt32 for xid: {}", xid.status().message())); - - ENVOY_LOG(trace, "zookeeper_proxy: decoding request with xid {} at offset {}", xid.value(), - offset); - - switch (static_cast(xid.value())) { + const int32_t xid = helper_.peekInt32(data, offset); + ENVOY_LOG(trace, "zookeeper_proxy: decoding request with xid {} at offset {}", xid, offset); + switch (static_cast(xid)) { case XidCodes::ConnectXid: - status = parseConnect(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, - fmt::format("parseConnect: {}", status.message())); - - control_requests_by_xid_[xid.value()].push({OpCodes::Connect, std::move(start_time)}); + parseConnect(data, offset, len); + control_requests_by_xid_[xid].push({OpCodes::Connect, std::move(start_time)}); return OpCodes::Connect; case XidCodes::PingXid: offset += OPCODE_LENGTH; callbacks_.onPing(); - control_requests_by_xid_[xid.value()].push({OpCodes::Ping, std::move(start_time)}); + control_requests_by_xid_[xid].push({OpCodes::Ping, std::move(start_time)}); return OpCodes::Ping; case XidCodes::AuthXid: - status = parseAuthRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, - fmt::format("parseAuthRequest: {}", status.message())); - - control_requests_by_xid_[xid.value()].push({OpCodes::SetAuth, std::move(start_time)}); + parseAuthRequest(data, offset, len); + control_requests_by_xid_[xid].push({OpCodes::SetAuth, std::move(start_time)}); return OpCodes::SetAuth; case XidCodes::SetWatchesXid: offset += OPCODE_LENGTH; - status = parseSetWatchesRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseSetWatchesRequest: {}", status.message())); - - control_requests_by_xid_[xid.value()].push({OpCodes::SetWatches, std::move(start_time)}); + parseSetWatchesRequest(data, offset, len); + control_requests_by_xid_[xid].push({OpCodes::SetWatches, std::move(start_time)}); return OpCodes::SetWatches; default: // WATCH_XID is generated by the server, so that and everything @@ -122,164 +97,100 @@ absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instan // for two cases: auth requests can happen at any time and ping requests // must happen every 1/3 of the negotiated session timeout, to keep // the session alive. - const absl::StatusOr oc = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - oc, fmt::format("peekInt32 for opcode: {}", oc.status().message())); - - ENVOY_LOG(trace, "zookeeper_proxy: decoding request with opcode {} at offset {}", oc.value(), - offset); - - const auto opcode = static_cast(oc.value()); + const int32_t oc = helper_.peekInt32(data, offset); + ENVOY_LOG(trace, "zookeeper_proxy: decoding request with opcode {} at offset {}", oc, offset); + const auto opcode = static_cast(oc); switch (opcode) { case OpCodes::GetData: - status = parseGetDataRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseGetDataRequest: {}", status.message())); + parseGetDataRequest(data, offset, len); break; case OpCodes::Create: case OpCodes::Create2: case OpCodes::CreateContainer: case OpCodes::CreateTtl: - status = parseCreateRequest(data, offset, len.value(), static_cast(opcode)); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseCreateRequest: {}", status.message())); + parseCreateRequest(data, offset, len, static_cast(opcode)); break; case OpCodes::SetData: - status = parseSetRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, - fmt::format("parseSetRequest: {}", status.message())); + parseSetRequest(data, offset, len); break; case OpCodes::GetChildren: - status = parseGetChildrenRequest(data, offset, len.value(), false); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseGetChildrenRequest: {}", status.message())); + parseGetChildrenRequest(data, offset, len, false); break; case OpCodes::GetChildren2: - status = parseGetChildrenRequest(data, offset, len.value(), true); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseGetChildrenRequest: {}", status.message())); + parseGetChildrenRequest(data, offset, len, true); break; case OpCodes::Delete: - status = parseDeleteRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseDeleteRequest: {}", status.message())); + parseDeleteRequest(data, offset, len); break; case OpCodes::Exists: - status = parseExistsRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseExistsRequest: {}", status.message())); + parseExistsRequest(data, offset, len); break; case OpCodes::GetAcl: - status = parseGetAclRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseGetAclRequest: {}", status.message())); + parseGetAclRequest(data, offset, len); break; case OpCodes::SetAcl: - status = parseSetAclRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseSetAclRequest: {}", status.message())); + parseSetAclRequest(data, offset, len); break; case OpCodes::Sync: - status = callbacks_.onSyncRequest(pathOnlyRequest(data, offset, len.value())); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, - fmt::format("onSyncRequest: {}", status.message())); + callbacks_.onSyncRequest(pathOnlyRequest(data, offset, len)); break; case OpCodes::Check: - status = parseCheckRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, - fmt::format("parseCheckRequest: {}", status.message())); + parseCheckRequest(data, offset, len); break; case OpCodes::Multi: - status = parseMultiRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, - fmt::format("parseMultiRequest: {}", status.message())); + parseMultiRequest(data, offset, len); break; case OpCodes::Reconfig: - status = parseReconfigRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseReconfigRequest: {}", status.message())); + parseReconfigRequest(data, offset, len); break; case OpCodes::SetWatches: - status = parseSetWatchesRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseSetWatchesRequest: {}", status.message())); + parseSetWatchesRequest(data, offset, len); break; case OpCodes::SetWatches2: - status = parseSetWatches2Request(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseSetWatches2Request: {}", status.message())); + parseSetWatches2Request(data, offset, len); break; case OpCodes::AddWatch: - status = parseAddWatchRequest(data, offset, len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseAddWatchRequest: {}", status.message())); + parseAddWatchRequest(data, offset, len); break; case OpCodes::CheckWatches: - status = parseXWatchesRequest(data, offset, len.value(), OpCodes::CheckWatches); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseXWatchesRequest (check watches): {}", status.message())); + parseXWatchesRequest(data, offset, len, OpCodes::CheckWatches); break; case OpCodes::RemoveWatches: - status = parseXWatchesRequest(data, offset, len.value(), OpCodes::RemoveWatches); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseXWatchesRequest (remove watches): {}", status.message())); + parseXWatchesRequest(data, offset, len, OpCodes::RemoveWatches); break; case OpCodes::GetEphemerals: - status = callbacks_.onGetEphemeralsRequest(pathOnlyRequest(data, offset, len.value())); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("onGetEphemeralsRequest: {}", status.message())); + callbacks_.onGetEphemeralsRequest(pathOnlyRequest(data, offset, len)); break; case OpCodes::GetAllChildrenNumber: - status = callbacks_.onGetAllChildrenNumberRequest(pathOnlyRequest(data, offset, len.value())); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("onGetAllChildrenNumberRequest: {}", status.message())); + callbacks_.onGetAllChildrenNumberRequest(pathOnlyRequest(data, offset, len)); break; case OpCodes::Close: callbacks_.onCloseRequest(); break; default: - ENVOY_LOG(debug, "zookeeper_proxy: decodeOnData exception: unknown opcode {}", - enumToSignedInt(opcode)); - callbacks_.onDecodeError(); - return absl::nullopt; + throw EnvoyException(fmt::format("Unknown opcode: {}", enumToSignedInt(opcode))); } - requests_by_xid_[xid.value()] = {opcode, std::move(start_time)}; + requests_by_xid_[xid] = {opcode, std::move(start_time)}; return opcode; } -absl::StatusOr> DecoderImpl::decodeOnWrite(Buffer::Instance& data, - uint64_t& offset) { +absl::optional DecoderImpl::decodeOnWrite(Buffer::Instance& data, uint64_t& offset) { ENVOY_LOG(trace, "zookeeper_proxy: decoding response with {} bytes at offset {}", data.length(), offset); // Check message length. - const absl::StatusOr len = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - len, fmt::format("peekInt32 for len: {}", len.status().message())); - - ENVOY_LOG(trace, "zookeeper_proxy: decoding response with len.value() {} at offset {}", - len.value(), offset); - - absl::Status status = - ensureMinLength(len.value(), XID_LENGTH + ZXID_LENGTH + INT_LENGTH); // xid + zxid + err - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("ensureMinLength: {}", status.message())); - - status = ensureMaxLength(len.value()); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("ensureMaxLength: {}", status.message())); - - const absl::StatusOr xid = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - xid, fmt::format("peekInt32 for xid: {}", xid.status().message())); + const int32_t len = helper_.peekInt32(data, offset); + ENVOY_LOG(trace, "zookeeper_proxy: decoding response with len {} at offset {}", len, offset); + ensureMinLength(len, XID_LENGTH + ZXID_LENGTH + INT_LENGTH); // xid + zxid + err + ensureMaxLength(len); - ENVOY_LOG(trace, "zookeeper_proxy: decoding response with xid {} at offset {}", xid.value(), - offset); - const auto xid_code = static_cast(xid.value()); + const auto xid = helper_.peekInt32(data, offset); + ENVOY_LOG(trace, "zookeeper_proxy: decoding response with xid {} at offset {}", xid, offset); + const auto xid_code = static_cast(xid); - absl::StatusOr latency; + std::chrono::milliseconds latency; OpCodes opcode; switch (xid_code) { @@ -290,517 +201,321 @@ absl::StatusOr> DecoderImpl::decodeOnWrite(Buffer::Insta case XidCodes::AuthXid: ABSL_FALLTHROUGH_INTENDED; case XidCodes::SetWatchesXid: - latency = fetchControlRequestData(xid.value(), opcode); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - latency, fmt::format("fetchControlRequestData: {}", latency.status().message())); + latency = fetchControlRequestData(xid, opcode); break; case XidCodes::WatchXid: // WATCH_XID is generated by the server, no need to fetch opcode and latency here. break; default: - latency = fetchDataRequestData(xid.value(), opcode); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - latency, fmt::format("fetchDataRequestData: {}", latency.status().message())); + latency = fetchDataRequestData(xid, opcode); } // Connect responses are special, they have no full reply header // but just an XID with no zxid nor error fields like the ones // available for all other server generated messages. if (xid_code == XidCodes::ConnectXid) { - status = parseConnectResponse(data, offset, len.value(), latency.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, fmt::format("parseConnectResponse: {}", status.message())) + parseConnectResponse(data, offset, len, latency); return opcode; } // Control responses that aren't connect, with XIDs <= 0. - const absl::StatusOr zxid = helper_.peekInt64(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - zxid, fmt::format("peekInt64 for zxid: {}", zxid.status().message())); - - const absl::StatusOr error = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - error, fmt::format("peekInt32 for error: {}", error.status().message())); - - ENVOY_LOG(trace, - "zookeeper_proxy: decoding response with zxid.value() {} and error {} at offset {}", - zxid.value(), error.value(), offset); - + const auto zxid = helper_.peekInt64(data, offset); + const auto error = helper_.peekInt32(data, offset); + ENVOY_LOG(trace, "zookeeper_proxy: decoding response with zxid {} and error {} at offset {}", + zxid, error, offset); switch (xid_code) { case XidCodes::PingXid: - callbacks_.onResponse(OpCodes::Ping, xid.value(), zxid.value(), error.value(), latency.value()); + callbacks_.onResponse(OpCodes::Ping, xid, zxid, error, latency); return opcode; case XidCodes::AuthXid: - callbacks_.onResponse(OpCodes::SetAuth, xid.value(), zxid.value(), error.value(), - latency.value()); + callbacks_.onResponse(OpCodes::SetAuth, xid, zxid, error, latency); return opcode; case XidCodes::SetWatchesXid: - callbacks_.onResponse(OpCodes::SetWatches, xid.value(), zxid.value(), error.value(), - latency.value()); + callbacks_.onResponse(OpCodes::SetWatches, xid, zxid, error, latency); return opcode; case XidCodes::WatchXid: - status = parseWatchEvent(data, offset, len.value(), zxid.value(), error.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, - fmt::format("parseWatchEvent: {}", status.message())); - + parseWatchEvent(data, offset, len, zxid, error); return absl::nullopt; // WATCH_XID is generated by the server, it has no corresponding opcode. default: break; } - callbacks_.onResponse(opcode, xid.value(), zxid.value(), error.value(), latency.value()); - offset += (len.value() - (XID_LENGTH + ZXID_LENGTH + INT_LENGTH)); + callbacks_.onResponse(opcode, xid, zxid, error, latency); + offset += (len - (XID_LENGTH + ZXID_LENGTH + INT_LENGTH)); return opcode; } -absl::Status DecoderImpl::ensureMinLength(const int32_t len, const int32_t minlen) const { +void DecoderImpl::ensureMinLength(const int32_t len, const int32_t minlen) const { if (len < minlen) { - return absl::InvalidArgumentError("packet is too small"); + throw EnvoyException("Packet is too small"); } - return absl::OkStatus(); } -absl::Status DecoderImpl::ensureMaxLength(const int32_t len) const { +void DecoderImpl::ensureMaxLength(const int32_t len) const { if (static_cast(len) > max_packet_bytes_) { - return absl::InvalidArgumentError("packet is too big"); + throw EnvoyException("Packet is too big"); } - return absl::OkStatus(); } -absl::Status DecoderImpl::parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - absl::Status status = - ensureMinLength(len, XID_LENGTH + ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); +void DecoderImpl::parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); // Skip zxid, timeout, and session id. offset += ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH; // Skip password. - status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr readonly = maybeReadBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, - readonly.status().message()); + skipString(data, offset); - callbacks_.onConnect(readonly.value()); + const bool readonly = maybeReadBool(data, offset); - return absl::OkStatus(); + callbacks_.onConnect(readonly); } -absl::Status DecoderImpl::parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - absl::Status status = - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + INT_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); +void DecoderImpl::parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + INT_LENGTH + INT_LENGTH); + // Skip opcode + type. offset += OPCODE_LENGTH + INT_LENGTH; - - const absl::StatusOr scheme = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(scheme, scheme.status().message()); - + const std::string scheme = helper_.peekString(data, offset); // Skip credential. - status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + skipString(data, offset); - callbacks_.onAuthRequest(scheme.value()); - - return absl::OkStatus(); + callbacks_.onAuthRequest(scheme); } -absl::Status DecoderImpl::parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - - const absl::StatusOr watch = helper_.peekBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, watch.status().message()); +void DecoderImpl::parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); - callbacks_.onGetDataRequest(path.value(), watch.value()); + const std::string path = helper_.peekString(data, offset); + const bool watch = helper_.peekBool(data, offset); - return absl::OkStatus(); + callbacks_.onGetDataRequest(path, watch); } -absl::Status DecoderImpl::skipAcls(Buffer::Instance& data, uint64_t& offset) { - const absl::StatusOr count = helper_.peekInt32(data, offset); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(count, - fmt::format("skipAcls: {}", count.status().message())); +void DecoderImpl::skipAcls(Buffer::Instance& data, uint64_t& offset) { + const int32_t count = helper_.peekInt32(data, offset); - for (int i = 0; i < count.value(); ++i) { + for (int i = 0; i < count; ++i) { // Perms. - absl::StatusOr perms = helper_.peekInt32(data, offset); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(perms, - fmt::format("skipAcls: {}", perms.status().message())); + helper_.peekInt32(data, offset); // Skip scheme. - absl::Status status = skipString(data, offset); - ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status); + skipString(data, offset); // Skip cred. - status = skipString(data, offset); - ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status); + skipString(data, offset); } - - return absl::OkStatus(); } -absl::Status DecoderImpl::parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, - OpCodes opcode) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (4 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); +void DecoderImpl::parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + OpCodes opcode) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (4 * INT_LENGTH)); - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + const std::string path = helper_.peekString(data, offset); // Skip data. - status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - status = skipAcls(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - absl::StatusOr flag_data = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(flag_data, - flag_data.status().message()); + skipString(data, offset); + skipAcls(data, offset); - const CreateFlags flags = static_cast(flag_data.value()); - status = callbacks_.onCreateRequest(path.value(), flags, opcode); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - return absl::OkStatus(); + const CreateFlags flags = static_cast(helper_.peekInt32(data, offset)); + callbacks_.onCreateRequest(path, flags, opcode); } -absl::Status DecoderImpl::parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); +void DecoderImpl::parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); + const std::string path = helper_.peekString(data, offset); // Skip data. - status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - + skipString(data, offset); // Ignore version. - absl::StatusOr version = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); + helper_.peekInt32(data, offset); - callbacks_.onSetRequest(path.value()); - - return absl::OkStatus(); + callbacks_.onSetRequest(path); } -absl::Status DecoderImpl::parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len, const bool two) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - - const absl::StatusOr watch = helper_.peekBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, watch.status().message()); +void DecoderImpl::parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + const bool two) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); - callbacks_.onGetChildrenRequest(path.value(), watch.value(), two); + const std::string path = helper_.peekString(data, offset); + const bool watch = helper_.peekBool(data, offset); - return absl::OkStatus(); + callbacks_.onGetChildrenRequest(path, watch, two); } -absl::Status DecoderImpl::parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); +void DecoderImpl::parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + const std::string path = helper_.peekString(data, offset); + const int32_t version = helper_.peekInt32(data, offset); - const absl::StatusOr version = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); - - callbacks_.onDeleteRequest(path.value(), version.value()); - - return absl::OkStatus(); + callbacks_.onDeleteRequest(path, version); } -absl::Status DecoderImpl::parseExistsRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); +void DecoderImpl::parseExistsRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); - const absl::StatusOr watch = helper_.peekBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, watch.status().message()); + const std::string path = helper_.peekString(data, offset); + const bool watch = helper_.peekBool(data, offset); - callbacks_.onExistsRequest(path.value(), watch.value()); - - return absl::OkStatus(); + callbacks_.onExistsRequest(path, watch); } -absl::Status DecoderImpl::parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); +void DecoderImpl::parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); - callbacks_.onGetAclRequest(path.value()); + const std::string path = helper_.peekString(data, offset); - return absl::OkStatus(); + callbacks_.onGetAclRequest(path); } -absl::Status DecoderImpl::parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - - status = skipAcls(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr version = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); +void DecoderImpl::parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - callbacks_.onSetAclRequest(path.value(), version.value()); + const std::string path = helper_.peekString(data, offset); + skipAcls(data, offset); + const int32_t version = helper_.peekInt32(data, offset); - return absl::OkStatus(); + callbacks_.onSetAclRequest(path, version); } -absl::StatusOr DecoderImpl::pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - status, - fmt::format("zookeeper_proxy: path only request decoding exception {}", status.message())); - +std::string DecoderImpl::pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); return helper_.peekString(data, offset); } -absl::Status DecoderImpl::parseCheckRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - - const absl::StatusOr version = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); +void DecoderImpl::parseCheckRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - callbacks_.onCheckRequest(path.value(), version.value()); + const std::string path = helper_.peekString(data, offset); + const int32_t version = helper_.peekInt32(data, offset); - return absl::OkStatus(); + callbacks_.onCheckRequest(path, version); } -absl::Status DecoderImpl::parseMultiRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { +void DecoderImpl::parseMultiRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { // Treat empty transactions as a decoding error, there should be at least 1 header. - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + MULTI_HEADER_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + MULTI_HEADER_LENGTH); while (true) { - const absl::StatusOr opcode = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(opcode, opcode.status().message()); - - const absl::StatusOr done = helper_.peekBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(done, done.status().message()); - + const int32_t opcode = helper_.peekInt32(data, offset); + const bool done = helper_.peekBool(data, offset); // Ignore error field. - const absl::StatusOr error = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(error, error.status().message()); + helper_.peekInt32(data, offset); - if (done.value()) { + if (done) { break; } - switch (static_cast(opcode.value())) { + switch (static_cast(opcode)) { case OpCodes::Create: - status = parseCreateRequest(data, offset, len, OpCodes::Create); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + parseCreateRequest(data, offset, len, OpCodes::Create); break; case OpCodes::SetData: - status = parseSetRequest(data, offset, len); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + parseSetRequest(data, offset, len); break; case OpCodes::Check: - status = parseCheckRequest(data, offset, len); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + parseCheckRequest(data, offset, len); break; case OpCodes::Delete: - status = parseDeleteRequest(data, offset, len); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + parseDeleteRequest(data, offset, len); break; default: - callbacks_.onDecodeError(); - return absl::InvalidArgumentError( - fmt::format("unknown opcode within a transaction: {}", opcode.value())); + throw EnvoyException(fmt::format("Unknown opcode within a transaction: {}", opcode)); } } callbacks_.onMultiRequest(); - - return absl::OkStatus(); } -absl::Status DecoderImpl::parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH) + LONG_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); +void DecoderImpl::parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH) + LONG_LENGTH); // Skip joining. - status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - + skipString(data, offset); // Skip leaving. - status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + skipString(data, offset); // Skip new members. - status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - + skipString(data, offset); // Read config id. - absl::StatusOr config_id = helper_.peekInt64(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(config_id, - config_id.status().message()); + helper_.peekInt64(data, offset); callbacks_.onReconfigRequest(); - - return absl::OkStatus(); } -absl::Status DecoderImpl::parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (3 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); +void DecoderImpl::parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (3 * INT_LENGTH)); // Ignore relative Zxid. - absl::StatusOr zxid = helper_.peekInt64(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, zxid.status().message()); - + helper_.peekInt64(data, offset); // Data watches. - status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - + skipStrings(data, offset); // Exist watches. - status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - + skipStrings(data, offset); // Child watches. - status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + skipStrings(data, offset); callbacks_.onSetWatchesRequest(); - - return absl::OkStatus(); } -absl::Status DecoderImpl::parseSetWatches2Request(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (5 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); +void DecoderImpl::parseSetWatches2Request(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (5 * INT_LENGTH)); // Ignore relative Zxid. - absl::StatusOr zxid = helper_.peekInt64(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, zxid.status().message()); - + helper_.peekInt64(data, offset); // Data watches. - status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - + skipStrings(data, offset); // Exist watches. - status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - + skipStrings(data, offset); // Child watches. - status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - + skipStrings(data, offset); // Persistent watches. - status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - + skipStrings(data, offset); // Persistent recursive watches. - status = skipStrings(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + skipStrings(data, offset); callbacks_.onSetWatches2Request(); - - return absl::OkStatus(); } -absl::Status DecoderImpl::parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); +void DecoderImpl::parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - const absl::StatusOr mode = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(mode, mode.status().message()); + const std::string path = helper_.peekString(data, offset); + const int32_t mode = helper_.peekInt32(data, offset); - callbacks_.onAddWatchRequest(path.value(), mode.value()); - - return absl::OkStatus(); + callbacks_.onAddWatchRequest(path, mode); } -absl::Status DecoderImpl::parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len, OpCodes opcode) { - absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); +void DecoderImpl::parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + OpCodes opcode) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); - const absl::StatusOr watch_type = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch_type, - watch_type.status().message()); + const std::string path = helper_.peekString(data, offset); + const int32_t type = helper_.peekInt32(data, offset); if (opcode == OpCodes::CheckWatches) { - callbacks_.onCheckWatchesRequest(path.value(), watch_type.value()); + callbacks_.onCheckWatchesRequest(path, type); } else { - callbacks_.onRemoveWatchesRequest(path.value(), watch_type.value()); + callbacks_.onRemoveWatchesRequest(path, type); } - - return absl::OkStatus(); } -absl::Status DecoderImpl::skipString(Buffer::Instance& data, uint64_t& offset) { - const absl::StatusOr slen = helper_.peekInt32(data, offset); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(slen, - fmt::format("skipString: {}", slen.status().message())); - - if (slen.value() < 0) { +void DecoderImpl::skipString(Buffer::Instance& data, uint64_t& offset) { + const int32_t slen = helper_.peekInt32(data, offset); + if (slen < 0) { ENVOY_LOG(trace, "zookeeper_proxy: decoding response with negative string length {} at offset {}", - slen.value(), offset); - return absl::OkStatus(); + slen, offset); + return; } - - helper_.skip(slen.value(), offset); - - return absl::OkStatus(); + helper_.skip(slen, offset); } -absl::Status DecoderImpl::skipStrings(Buffer::Instance& data, uint64_t& offset) { - const absl::StatusOr count = helper_.peekInt32(data, offset); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(count, - fmt::format("skipStrings: {}", count.status().message())); +void DecoderImpl::skipStrings(Buffer::Instance& data, uint64_t& offset) { + const int32_t count = helper_.peekInt32(data, offset); - for (int i = 0; i < count.value(); ++i) { - absl::Status status = skipString(data, offset); - ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status); + for (int i = 0; i < count; ++i) { + skipString(data, offset); } - - return absl::OkStatus(); } Network::FilterStatus DecoderImpl::onData(Buffer::Instance& data) { @@ -814,14 +529,9 @@ Network::FilterStatus DecoderImpl::onWrite(Buffer::Instance& data) { Network::FilterStatus DecoderImpl::decodeAndBuffer(Buffer::Instance& data, DecodeType dtype, Buffer::OwnedImpl& zk_filter_buffer) { const uint32_t zk_filter_buffer_len = zk_filter_buffer.length(); - absl::Status status; if (zk_filter_buffer_len == 0) { - status = decodeAndBufferHelper(data, dtype, zk_filter_buffer); - if (!status.ok()) { - ENVOY_LOG(debug, "zookeeper_proxy: decodeAndBufferHelper exception: {}", status.message()); - } - + decodeAndBufferHelper(data, dtype, zk_filter_buffer); return Network::FilterStatus::Continue; } @@ -829,59 +539,52 @@ Network::FilterStatus DecoderImpl::decodeAndBuffer(Buffer::Instance& data, Decod // Prepending ZooKeeper filter buffer to the current network filter buffer can help to generate // full packets. data.prepend(zk_filter_buffer); - - status = decodeAndBufferHelper(data, dtype, zk_filter_buffer); - if (!status.ok()) { - ENVOY_LOG(debug, "zookeeper_proxy: decodeAndBufferHelper exception: {}", status.message()); - } - + decodeAndBufferHelper(data, dtype, zk_filter_buffer); // Drain the prepended ZooKeeper filter buffer. data.drain(zk_filter_buffer_len); return Network::FilterStatus::Continue; } -absl::Status DecoderImpl::decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype, - Buffer::OwnedImpl& zk_filter_buffer) { +void DecoderImpl::decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype, + Buffer::OwnedImpl& zk_filter_buffer) { ASSERT(dtype == DecodeType::READ || dtype == DecodeType::WRITE); const uint32_t data_len = data.length(); uint64_t offset = 0; - absl::StatusOr len = 0; - absl::Status status; + uint32_t len = 0; // Boolean to check whether there is at least one full packet in the network filter buffer (to // which the ZooKeeper filter buffer is prepended). bool has_full_packets = false; while (offset < data_len) { - // Peek packet length. - len = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( - len, fmt::format("peekInt32 for len: {}", len.status().message())); - - status = ensureMinLength(len.value(), dtype == DecodeType::READ - ? XID_LENGTH + INT_LENGTH - : XID_LENGTH + ZXID_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - status = ensureMaxLength(len.value()); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - offset += len.value(); - if (offset <= data_len) { - has_full_packets = true; + TRY_NEEDS_AUDIT { + // Peek packet length. + len = helper_.peekInt32(data, offset); + ensureMinLength(len, dtype == DecodeType::READ ? XID_LENGTH + INT_LENGTH + : XID_LENGTH + ZXID_LENGTH + INT_LENGTH); + ensureMaxLength(len); + offset += len; + if (offset <= data_len) { + has_full_packets = true; + } + } + END_TRY catch (const EnvoyException& e) { + ENVOY_LOG(debug, "zookeeper_proxy: decoding exception {}", e.what()); + callbacks_.onDecodeError(); + return; } } if (offset == data_len) { decode(data, dtype, offset); - return absl::OkStatus(); + return; } ASSERT(offset > data_len); std::string temp_data; if (has_full_packets) { - offset -= INT_LENGTH + len.value(); + offset -= INT_LENGTH + len; ASSERT(offset < data_len); // Decode full packets. // offset here represents the length of all full packets. @@ -898,112 +601,87 @@ absl::Status DecoderImpl::decodeAndBufferHelper(Buffer::Instance& data, DecodeTy data.copyOut(0, data_len, temp_data.data()); zk_filter_buffer.add(temp_data.data(), temp_data.length()); } - - return absl::OkStatus(); } void DecoderImpl::decode(Buffer::Instance& data, DecodeType dtype, uint64_t full_packets_len) { uint64_t offset = 0; - while (offset < full_packets_len) { - // Reset the helper's cursor, to ensure the current message stays within the - // allowed max length, even when it's different than the declared length - // by the message. - // - // Note: we need to keep two cursors — offset and helper_'s internal one — because - // a buffer may contain multiple messages, so offset is global while helper_'s - // internal cursor gets reset for each individual message. - helper_.reset(); - - const uint64_t current = offset; - absl::StatusOr> opcode; - switch (dtype) { - case DecodeType::READ: - opcode = decodeOnData(data, offset); - if (opcode.ok()) { - callbacks_.onRequestBytes(opcode.value(), offset - current); + TRY_NEEDS_AUDIT { + while (offset < full_packets_len) { + // Reset the helper's cursor, to ensure the current message stays within the + // allowed max length, even when it's different than the declared length + // by the message. + // + // Note: we need to keep two cursors — offset and helper_'s internal one — because + // a buffer may contain multiple messages, so offset is global while helper_'s + // internal cursor gets reset for each individual message. + helper_.reset(); + + const uint64_t current = offset; + absl::optional opcode; + switch (dtype) { + case DecodeType::READ: + opcode = decodeOnData(data, offset); + callbacks_.onRequestBytes(opcode, offset - current); break; - } - ENVOY_LOG(debug, "zookeeper_proxy: decodeOnData exception: {}", opcode.status().message()); - return; - case DecodeType::WRITE: - opcode = decodeOnWrite(data, offset); - if (opcode.ok()) { - callbacks_.onResponseBytes(opcode.value(), offset - current); + case DecodeType::WRITE: + opcode = decodeOnWrite(data, offset); + callbacks_.onResponseBytes(opcode, offset - current); break; } - ENVOY_LOG(debug, "zookeeper_proxy: decodeOnWrite exception: {}", opcode.status().message()); - return; } } + END_TRY catch (const EnvoyException& e) { + ENVOY_LOG(debug, "zookeeper_proxy: decoding exception {}", e.what()); + callbacks_.onDecodeError(); + } } -absl::Status DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& offset, - uint32_t len, - const std::chrono::milliseconds latency) { - absl::Status status = - ensureMinLength(len, PROTOCOL_VERSION_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); +void DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len, + const std::chrono::milliseconds latency) { + ensureMinLength(len, PROTOCOL_VERSION_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); - const absl::StatusOr timeout = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(timeout, timeout.status().message()); + const auto timeout = helper_.peekInt32(data, offset); // Skip session id + password. offset += SESSION_LENGTH; - status = skipString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr readonly = maybeReadBool(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, - readonly.status().message()); + skipString(data, offset); - callbacks_.onConnectResponse(0, timeout.value(), readonly.value(), latency); + const bool readonly = maybeReadBool(data, offset); - return absl::OkStatus(); + callbacks_.onConnectResponse(0, timeout, readonly, latency); } -absl::Status DecoderImpl::parseWatchEvent(Buffer::Instance& data, uint64_t& offset, - const uint32_t len, const int64_t zxid, - const int32_t error) { - absl::Status status = ensureMinLength(len, SERVER_HEADER_LENGTH + (3 * INT_LENGTH)); - EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - - const absl::StatusOr event_type = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(event_type, - event_type.status().message()); - - const absl::StatusOr client_state = helper_.peekInt32(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(client_state, - client_state.status().message()); - - const absl::StatusOr path = helper_.peekString(data, offset); - EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); +void DecoderImpl::parseWatchEvent(Buffer::Instance& data, uint64_t& offset, const uint32_t len, + const int64_t zxid, const int32_t error) { + ensureMinLength(len, SERVER_HEADER_LENGTH + (3 * INT_LENGTH)); - callbacks_.onWatchEvent(event_type.value(), client_state.value(), path.value(), zxid, error); + const auto event_type = helper_.peekInt32(data, offset); + const auto client_state = helper_.peekInt32(data, offset); + const auto path = helper_.peekString(data, offset); - return absl::OkStatus(); + callbacks_.onWatchEvent(event_type, client_state, path, zxid, error); } -absl::StatusOr DecoderImpl::maybeReadBool(Buffer::Instance& data, uint64_t& offset) { +bool DecoderImpl::maybeReadBool(Buffer::Instance& data, uint64_t& offset) { if (data.length() >= offset + 1) { return helper_.peekBool(data, offset); } return false; } -absl::StatusOr DecoderImpl::fetchControlRequestData(const int32_t xid, - OpCodes& opcode) { +std::chrono::milliseconds DecoderImpl::fetchControlRequestData(const int32_t xid, OpCodes& opcode) { // Find the corresponding request queue for this XID. const auto it = control_requests_by_xid_.find(xid); // If this fails, it's either a server-side bug or a malformed packet. if (it == control_requests_by_xid_.end()) { - return absl::InvalidArgumentError(fmt::format("control request xid {} not found", xid)); + throw EnvoyException(fmt::format("control request xid {} not found", xid)); } std::queue& rq_queue = it->second; if (rq_queue.empty()) { - return absl::InvalidArgumentError(fmt::format("control request queue for {} is empty", xid)); + throw EnvoyException(fmt::format("control request queue for {} is empty", xid)); } std::chrono::milliseconds latency = std::chrono::duration_cast( @@ -1014,14 +692,13 @@ absl::StatusOr DecoderImpl::fetchControlRequestData(c return latency; } -absl::StatusOr DecoderImpl::fetchDataRequestData(const int32_t xid, - OpCodes& opcode) { +std::chrono::milliseconds DecoderImpl::fetchDataRequestData(const int32_t xid, OpCodes& opcode) { // Find the corresponding request for this XID. const auto it = requests_by_xid_.find(xid); // If this fails, it's either a server-side bug or a malformed packet. if (it == requests_by_xid_.end()) { - return absl::InvalidArgumentError(fmt::format("data request xid {} not found", xid)); + throw EnvoyException(fmt::format("xid {} not found", xid)); } std::chrono::milliseconds latency = std::chrono::duration_cast( diff --git a/source/extensions/filters/network/zookeeper_proxy/decoder.h b/source/extensions/filters/network/zookeeper_proxy/decoder.h index 8718a8d9d6b1..34ad9b4d32ff 100644 --- a/source/extensions/filters/network/zookeeper_proxy/decoder.h +++ b/source/extensions/filters/network/zookeeper_proxy/decoder.h @@ -12,7 +12,6 @@ #include "source/extensions/filters/network/zookeeper_proxy/utils.h" #include "absl/container/node_hash_map.h" -#include "absl/status/statusor.h" namespace Envoy { namespace Extensions { @@ -86,17 +85,16 @@ class DecoderCallbacks { virtual void onPing() PURE; virtual void onAuthRequest(const std::string& scheme) PURE; virtual void onGetDataRequest(const std::string& path, bool watch) PURE; - virtual absl::Status onCreateRequest(const std::string& path, CreateFlags flags, - OpCodes opcode) PURE; + virtual void onCreateRequest(const std::string& path, CreateFlags flags, OpCodes opcode) PURE; virtual void onSetRequest(const std::string& path) PURE; virtual void onGetChildrenRequest(const std::string& path, bool watch, bool v2) PURE; - virtual absl::Status onGetEphemeralsRequest(const absl::StatusOr& path) PURE; - virtual absl::Status onGetAllChildrenNumberRequest(const absl::StatusOr& path) PURE; + virtual void onGetEphemeralsRequest(const std::string& path) PURE; + virtual void onGetAllChildrenNumberRequest(const std::string& path) PURE; virtual void onDeleteRequest(const std::string& path, int32_t version) PURE; virtual void onExistsRequest(const std::string& path, bool watch) PURE; virtual void onGetAclRequest(const std::string& path) PURE; virtual void onSetAclRequest(const std::string& path, int32_t version) PURE; - virtual absl::Status onSyncRequest(const absl::StatusOr& path) PURE; + virtual void onSyncRequest(const std::string& path) PURE; virtual void onCheckRequest(const std::string& path, int32_t version) PURE; virtual void onMultiRequest() PURE; virtual void onReconfigRequest() PURE; @@ -153,50 +151,44 @@ class DecoderImpl : public Decoder, Logger::Loggable { // (4) removes the prepended data. Network::FilterStatus decodeAndBuffer(Buffer::Instance& data, DecodeType dtype, Buffer::OwnedImpl& zk_filter_buffer); - absl::Status decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype, - Buffer::OwnedImpl& zk_filter_buffer); + void decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype, + Buffer::OwnedImpl& zk_filter_buffer); void decode(Buffer::Instance& data, DecodeType dtype, uint64_t full_packets_len); // decodeOnData and decodeOnWrite return ZooKeeper opcode or absl::nullopt. // absl::nullopt indicates WATCH_XID, which is generated by the server and has no corresponding // opcode. - absl::StatusOr> decodeOnData(Buffer::Instance& data, uint64_t& offset); - absl::StatusOr> decodeOnWrite(Buffer::Instance& data, uint64_t& offset); - absl::Status parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, - OpCodes opcode); - absl::Status skipAcls(Buffer::Instance& data, uint64_t& offset); - absl::Status parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, - bool two); - absl::Status parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseExistsRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseCheckRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseMultiRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseSetWatches2Request(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - absl::Status parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, - OpCodes opcode); - absl::Status skipString(Buffer::Instance& data, uint64_t& offset); - absl::Status skipStrings(Buffer::Instance& data, uint64_t& offset); - absl::Status ensureMinLength(int32_t len, int32_t minlen) const; - absl::Status ensureMaxLength(int32_t len) const; - absl::StatusOr pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, - uint32_t len); - absl::Status parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len, - const std::chrono::milliseconds latency); - absl::Status parseWatchEvent(Buffer::Instance& data, uint64_t& offset, uint32_t len, int64_t zxid, - int32_t error); - absl::StatusOr maybeReadBool(Buffer::Instance& data, uint64_t& offset); - absl::StatusOr fetchControlRequestData(const int32_t xid, - OpCodes& opcode); - absl::StatusOr fetchDataRequestData(const int32_t xid, - OpCodes& opcode); + absl::optional decodeOnData(Buffer::Instance& data, uint64_t& offset); + absl::optional decodeOnWrite(Buffer::Instance& data, uint64_t& offset); + void parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, OpCodes opcode); + void skipAcls(Buffer::Instance& data, uint64_t& offset); + void parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, bool two); + void parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseExistsRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseCheckRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseMultiRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseSetWatches2Request(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, OpCodes opcode); + void skipString(Buffer::Instance& data, uint64_t& offset); + void skipStrings(Buffer::Instance& data, uint64_t& offset); + void ensureMinLength(int32_t len, int32_t minlen) const; + void ensureMaxLength(int32_t len) const; + std::string pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len, + const std::chrono::milliseconds latency); + void parseWatchEvent(Buffer::Instance& data, uint64_t& offset, uint32_t len, int64_t zxid, + int32_t error); + bool maybeReadBool(Buffer::Instance& data, uint64_t& offset); + std::chrono::milliseconds fetchControlRequestData(const int32_t xid, OpCodes& opcode); + std::chrono::milliseconds fetchDataRequestData(const int32_t xid, OpCodes& opcode); DecoderCallbacks& callbacks_; const uint32_t max_packet_bytes_; diff --git a/source/extensions/filters/network/zookeeper_proxy/filter.cc b/source/extensions/filters/network/zookeeper_proxy/filter.cc index 817fa14077d4..013bea03b1e0 100644 --- a/source/extensions/filters/network/zookeeper_proxy/filter.cc +++ b/source/extensions/filters/network/zookeeper_proxy/filter.cc @@ -293,8 +293,8 @@ void ZooKeeperFilter::onGetDataRequest(const std::string& path, const bool watch setDynamicMetadata({{"opname", "getdata"}, {"path", path}, {"watch", watch ? "true" : "false"}}); } -absl::Status ZooKeeperFilter::onCreateRequest(const std::string& path, const CreateFlags flags, - const OpCodes opcode) { +void ZooKeeperFilter::onCreateRequest(const std::string& path, const CreateFlags flags, + const OpCodes opcode) { std::string opname; switch (opcode) { @@ -315,14 +315,12 @@ absl::Status ZooKeeperFilter::onCreateRequest(const std::string& path, const Cre config_->stats_.createttl_rq_.inc(); break; default: - return absl::InvalidArgumentError(fmt::format("unknown opcode: {}", enumToSignedInt(opcode))); + throw EnvoyException(fmt::format("Unknown opcode: {}", enumToSignedInt(opcode))); break; } setDynamicMetadata( {{"opname", opname}, {"path", path}, {"create_type", createFlagsToString(flags)}}); - - return absl::OkStatus(); } void ZooKeeperFilter::onSetRequest(const std::string& path) { @@ -364,13 +362,9 @@ void ZooKeeperFilter::onSetAclRequest(const std::string& path, const int32_t ver setDynamicMetadata({{"opname", "setacl"}, {"path", path}, {"version", std::to_string(version)}}); } -absl::Status ZooKeeperFilter::onSyncRequest(const absl::StatusOr& path) { - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - +void ZooKeeperFilter::onSyncRequest(const std::string& path) { config_->stats_.sync_rq_.inc(); - setDynamicMetadata({{"opname", "sync"}, {"path", path.value()}}); - - return absl::OkStatus(); + setDynamicMetadata({{"opname", "sync"}, {"path", path}}); } void ZooKeeperFilter::onCheckRequest(const std::string&, const int32_t) { @@ -412,23 +406,14 @@ void ZooKeeperFilter::onAddWatchRequest(const std::string& path, const int32_t m setDynamicMetadata({{"opname", "addwatch"}, {"path", path}, {"mode", std::to_string(mode)}}); } -absl::Status ZooKeeperFilter::onGetEphemeralsRequest(const absl::StatusOr& path) { - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - +void ZooKeeperFilter::onGetEphemeralsRequest(const std::string& path) { config_->stats_.getephemerals_rq_.inc(); - setDynamicMetadata({{"opname", "getephemerals"}, {"path", path.value()}}); - - return absl::OkStatus(); + setDynamicMetadata({{"opname", "getephemerals"}, {"path", path}}); } -absl::Status -ZooKeeperFilter::onGetAllChildrenNumberRequest(const absl::StatusOr& path) { - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - +void ZooKeeperFilter::onGetAllChildrenNumberRequest(const std::string& path) { config_->stats_.getallchildrennumber_rq_.inc(); - setDynamicMetadata({{"opname", "getallchildrennumber"}, {"path", path.value()}}); - - return absl::OkStatus(); + setDynamicMetadata({{"opname", "getallchildrennumber"}, {"path", path}}); } void ZooKeeperFilter::onCloseRequest() { diff --git a/source/extensions/filters/network/zookeeper_proxy/filter.h b/source/extensions/filters/network/zookeeper_proxy/filter.h index 94f47506b211..294d8a3119da 100644 --- a/source/extensions/filters/network/zookeeper_proxy/filter.h +++ b/source/extensions/filters/network/zookeeper_proxy/filter.h @@ -17,8 +17,6 @@ #include "source/common/stats/symbol_table.h" #include "source/extensions/filters/network/zookeeper_proxy/decoder.h" -#include "absl/status/statusor.h" - namespace Envoy { namespace Extensions { namespace NetworkFilters { @@ -342,14 +340,14 @@ class ZooKeeperFilter : public Network::Filter, void onPing() override; void onAuthRequest(const std::string& scheme) override; void onGetDataRequest(const std::string& path, bool watch) override; - absl::Status onCreateRequest(const std::string& path, CreateFlags flags, OpCodes opcode) override; + void onCreateRequest(const std::string& path, CreateFlags flags, OpCodes opcode) override; void onSetRequest(const std::string& path) override; void onGetChildrenRequest(const std::string& path, bool watch, bool v2) override; void onDeleteRequest(const std::string& path, int32_t version) override; void onExistsRequest(const std::string& path, bool watch) override; void onGetAclRequest(const std::string& path) override; void onSetAclRequest(const std::string& path, int32_t version) override; - absl::Status onSyncRequest(const absl::StatusOr& path) override; + void onSyncRequest(const std::string& path) override; void onCheckRequest(const std::string& path, int32_t version) override; void onMultiRequest() override; void onReconfigRequest() override; @@ -358,8 +356,8 @@ class ZooKeeperFilter : public Network::Filter, void onAddWatchRequest(const std::string& path, const int32_t mode) override; void onCheckWatchesRequest(const std::string& path, int32_t type) override; void onRemoveWatchesRequest(const std::string& path, int32_t type) override; - absl::Status onGetEphemeralsRequest(const absl::StatusOr& path) override; - absl::Status onGetAllChildrenNumberRequest(const absl::StatusOr& path) override; + void onGetEphemeralsRequest(const std::string& path) override; + void onGetAllChildrenNumberRequest(const std::string& path) override; void onCloseRequest() override; void onResponseBytes(const absl::optional opcode, const uint64_t bytes) override; void onConnectResponse(int32_t proto_version, int32_t timeout, bool readonly, diff --git a/source/extensions/filters/network/zookeeper_proxy/utils.cc b/source/extensions/filters/network/zookeeper_proxy/utils.cc index b16a6cdc163c..cf1c77a8bf19 100644 --- a/source/extensions/filters/network/zookeeper_proxy/utils.cc +++ b/source/extensions/filters/network/zookeeper_proxy/utils.cc @@ -7,27 +7,24 @@ namespace Extensions { namespace NetworkFilters { namespace ZooKeeperProxy { -absl::StatusOr BufferHelper::peekInt32(Buffer::Instance& buffer, uint64_t& offset) { - absl::Status status = ensureMaxLen(sizeof(int32_t)); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, fmt::format("peekInt32: {}", status.message())); +int32_t BufferHelper::peekInt32(Buffer::Instance& buffer, uint64_t& offset) { + ensureMaxLen(sizeof(int32_t)); const int32_t val = buffer.peekBEInt(offset); offset += sizeof(int32_t); return val; } -absl::StatusOr BufferHelper::peekInt64(Buffer::Instance& buffer, uint64_t& offset) { - absl::Status status = ensureMaxLen(sizeof(int64_t)); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, fmt::format("peekInt64: {}", status.message())); +int64_t BufferHelper::peekInt64(Buffer::Instance& buffer, uint64_t& offset) { + ensureMaxLen(sizeof(int64_t)); const int64_t val = buffer.peekBEInt(offset); offset += sizeof(int64_t); return val; } -absl::StatusOr BufferHelper::peekBool(Buffer::Instance& buffer, uint64_t& offset) { - absl::Status status = ensureMaxLen(1); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, fmt::format("peekBool: {}", status.message())); +bool BufferHelper::peekBool(Buffer::Instance& buffer, uint64_t& offset) { + ensureMaxLen(1); const char byte = buffer.peekInt(offset); const bool val = static_cast(byte); @@ -35,27 +32,24 @@ absl::StatusOr BufferHelper::peekBool(Buffer::Instance& buffer, uint64_t& return val; } -absl::StatusOr BufferHelper::peekString(Buffer::Instance& buffer, uint64_t& offset) { +std::string BufferHelper::peekString(Buffer::Instance& buffer, uint64_t& offset) { std::string val; - const absl::StatusOr len = peekInt32(buffer, offset); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(len, - fmt::format("peekString: {}", len.status().message())); + const uint32_t len = peekInt32(buffer, offset); - if (len.value() == 0) { + if (len == 0) { return val; } - if (buffer.length() < (offset + len.value())) { - return absl::InvalidArgumentError("peekString: buffer is smaller than string length"); + if (buffer.length() < (offset + len)) { + throw EnvoyException("peekString: buffer is smaller than string length"); } - absl::Status status = ensureMaxLen(len.value()); - RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, fmt::format("peekString: {}", status.message())); + ensureMaxLen(len); - std::unique_ptr data(new char[len.value()]); - buffer.copyOut(offset, len.value(), data.get()); - val.assign(data.get(), len.value()); - offset += len.value(); + std::unique_ptr data(new char[len]); + buffer.copyOut(offset, len, data.get()); + val.assign(data.get(), len); + offset += len; return val; } @@ -65,14 +59,12 @@ void BufferHelper::skip(const uint32_t len, uint64_t& offset) { current_ += len; } -absl::Status BufferHelper::ensureMaxLen(const uint32_t size) { +void BufferHelper::ensureMaxLen(const uint32_t size) { current_ += size; if (current_ > max_len_) { - return absl::InvalidArgumentError("read beyond max length"); + throw EnvoyException("read beyond max length"); } - - return absl::OkStatus(); } } // namespace ZooKeeperProxy diff --git a/source/extensions/filters/network/zookeeper_proxy/utils.h b/source/extensions/filters/network/zookeeper_proxy/utils.h index 76b43661dcfb..dc699afaf548 100644 --- a/source/extensions/filters/network/zookeeper_proxy/utils.h +++ b/source/extensions/filters/network/zookeeper_proxy/utils.h @@ -9,8 +9,6 @@ #include "source/common/common/byte_order.h" #include "source/common/common/logger.h" -#include "absl/status/statusor.h" - namespace Envoy { namespace Extensions { namespace NetworkFilters { @@ -19,8 +17,8 @@ namespace ZooKeeperProxy { /** * Helper for extracting ZooKeeper data from a buffer. * - * If at any point a peek is tried beyond max_len, an invalid argument error - * will be returned. This is important to protect Envoy against malformed + * If at any point a peek is tried beyond max_len, an EnvoyException + * will be thrown. This is important to protect Envoy against malformed * requests (e.g.: when the declared and actual length don't match). * * Note: ZooKeeper's protocol uses network byte ordering (big-endian). @@ -29,41 +27,20 @@ class BufferHelper : public Logger::Loggable { public: BufferHelper(uint32_t max_len) : max_len_(max_len) {} - absl::StatusOr peekInt32(Buffer::Instance& buffer, uint64_t& offset); - absl::StatusOr peekInt64(Buffer::Instance& buffer, uint64_t& offset); - absl::StatusOr peekString(Buffer::Instance& buffer, uint64_t& offset); - absl::StatusOr peekBool(Buffer::Instance& buffer, uint64_t& offset); + int32_t peekInt32(Buffer::Instance& buffer, uint64_t& offset); + int64_t peekInt64(Buffer::Instance& buffer, uint64_t& offset); + std::string peekString(Buffer::Instance& buffer, uint64_t& offset); + bool peekBool(Buffer::Instance& buffer, uint64_t& offset); void skip(uint32_t len, uint64_t& offset); void reset() { current_ = 0; } private: - absl::Status ensureMaxLen(uint32_t size); + void ensureMaxLen(uint32_t size); const uint32_t max_len_; uint32_t current_{}; }; -#define ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status) \ - if (!status.ok()) { \ - return status; \ - } - -#define EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status) \ - if (!status.ok()) { \ - callbacks_.onDecodeError(); \ - return status; \ - } - -#define RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, message) \ - if (!status.ok()) { \ - return absl::InvalidArgumentError(message); \ - } - -#define EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, message) \ - if (!status.ok()) { \ - callbacks_.onDecodeError(); \ - return absl::InvalidArgumentError(message); \ - } } // namespace ZooKeeperProxy } // namespace NetworkFilters } // namespace Extensions