diff --git a/source/extensions/filters/network/zookeeper_proxy/decoder.cc b/source/extensions/filters/network/zookeeper_proxy/decoder.cc index 968dc798b321..92367e55063a 100644 --- a/source/extensions/filters/network/zookeeper_proxy/decoder.cc +++ b/source/extensions/filters/network/zookeeper_proxy/decoder.cc @@ -42,15 +42,26 @@ const char* createFlagsToString(CreateFlags flags) { return "unknown"; } -absl::optional DecoderImpl::decodeOnData(Buffer::Instance& data, uint64_t& offset) { +absl::StatusOr> DecoderImpl::decodeOnData(Buffer::Instance& data, + uint64_t& offset) { ENVOY_LOG(trace, "zookeeper_proxy: decoding request with {} bytes at offset {}", data.length(), offset); // Check message length. - const int32_t len = helper_.peekInt32(data, offset); - ENVOY_LOG(trace, "zookeeper_proxy: decoding request with len {} at offset {}", len, offset); - ensureMinLength(len, XID_LENGTH + INT_LENGTH); // xid + opcode - ensureMaxLength(len); + const absl::StatusOr len = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + len, fmt::format("peekInt32 for len: {}", len.status().message())); + + ENVOY_LOG(trace, "zookeeper_proxy: decoding request with len {} at offset {}", len.value(), + offset); + + absl::Status status = ensureMinLength(len.value(), XID_LENGTH + INT_LENGTH); // xid + opcode + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("ensureMinLength: {}", status.message())); + + status = ensureMaxLength(len.value()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("ensureMaxLength: {}", status.message())); auto start_time = time_source_.monotonicTime(); @@ -64,26 +75,40 @@ absl::optional DecoderImpl::decodeOnData(Buffer::Instance& data, uint64 // ZooKeeper server to the next. Thus, the special xid. // However, some client implementations might expose setWatches // as a regular data request, so we support that as well. - const int32_t xid = helper_.peekInt32(data, offset); - ENVOY_LOG(trace, "zookeeper_proxy: decoding request with xid {} at offset {}", xid, offset); - switch (static_cast(xid)) { + const absl::StatusOr xid = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + xid, fmt::format("peerInt32 for xid: {}", xid.status().message())); + + ENVOY_LOG(trace, "zookeeper_proxy: decoding request with xid {} at offset {}", xid.value(), + offset); + + switch (static_cast(xid.value())) { case XidCodes::ConnectXid: - parseConnect(data, offset, len); - control_requests_by_xid_[xid].push({OpCodes::Connect, std::move(start_time)}); + status = parseConnect(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, + fmt::format("parseConnect: {}", status.message())); + + control_requests_by_xid_[xid.value()].push({OpCodes::Connect, std::move(start_time)}); return OpCodes::Connect; case XidCodes::PingXid: offset += OPCODE_LENGTH; callbacks_.onPing(); - control_requests_by_xid_[xid].push({OpCodes::Ping, std::move(start_time)}); + control_requests_by_xid_[xid.value()].push({OpCodes::Ping, std::move(start_time)}); return OpCodes::Ping; case XidCodes::AuthXid: - parseAuthRequest(data, offset, len); - control_requests_by_xid_[xid].push({OpCodes::SetAuth, std::move(start_time)}); + status = parseAuthRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, + fmt::format("parseAuthRequest: {}", status.message())); + + control_requests_by_xid_[xid.value()].push({OpCodes::SetAuth, std::move(start_time)}); return OpCodes::SetAuth; case XidCodes::SetWatchesXid: offset += OPCODE_LENGTH; - parseSetWatchesRequest(data, offset, len); - control_requests_by_xid_[xid].push({OpCodes::SetWatches, std::move(start_time)}); + status = parseSetWatchesRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseSetWatchesRequest: {}", status.message())); + + control_requests_by_xid_[xid.value()].push({OpCodes::SetWatches, std::move(start_time)}); return OpCodes::SetWatches; default: // WATCH_XID is generated by the server, so that and everything @@ -97,100 +122,164 @@ absl::optional DecoderImpl::decodeOnData(Buffer::Instance& data, uint64 // for two cases: auth requests can happen at any time and ping requests // must happen every 1/3 of the negotiated session timeout, to keep // the session alive. - const int32_t oc = helper_.peekInt32(data, offset); - ENVOY_LOG(trace, "zookeeper_proxy: decoding request with opcode {} at offset {}", oc, offset); - const auto opcode = static_cast(oc); + const absl::StatusOr oc = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + oc, fmt::format("peekInt32 for opcode: {}", oc.status().message())); + + ENVOY_LOG(trace, "zookeeper_proxy: decoding request with opcode {} at offset {}", oc.value(), + offset); + + const auto opcode = static_cast(oc.value()); switch (opcode) { case OpCodes::GetData: - parseGetDataRequest(data, offset, len); + status = parseGetDataRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseGetDataRequest: {}", status.message())); break; case OpCodes::Create: case OpCodes::Create2: case OpCodes::CreateContainer: case OpCodes::CreateTtl: - parseCreateRequest(data, offset, len, static_cast(opcode)); + status = parseCreateRequest(data, offset, len.value(), static_cast(opcode)); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseCreateRequest: {}", status.message())); break; case OpCodes::SetData: - parseSetRequest(data, offset, len); + status = parseSetRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, + fmt::format("parseSetRequest: {}", status.message())); break; case OpCodes::GetChildren: - parseGetChildrenRequest(data, offset, len, false); + status = parseGetChildrenRequest(data, offset, len.value(), false); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseGetChildrenRequest: {}", status.message())); break; case OpCodes::GetChildren2: - parseGetChildrenRequest(data, offset, len, true); + status = parseGetChildrenRequest(data, offset, len.value(), true); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseGetChildrenRequest: {}", status.message())); break; case OpCodes::Delete: - parseDeleteRequest(data, offset, len); + status = parseDeleteRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseDeleteRequest: {}", status.message())); break; case OpCodes::Exists: - parseExistsRequest(data, offset, len); + status = parseExistsRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseExistsRequest: {}", status.message())); break; case OpCodes::GetAcl: - parseGetAclRequest(data, offset, len); + status = parseGetAclRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseGetAclRequest: {}", status.message())); break; case OpCodes::SetAcl: - parseSetAclRequest(data, offset, len); + status = parseSetAclRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseSetAclRequest: {}", status.message())); break; case OpCodes::Sync: - callbacks_.onSyncRequest(pathOnlyRequest(data, offset, len)); + status = callbacks_.onSyncRequest(pathOnlyRequest(data, offset, len.value())); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, + fmt::format("onSyncRequest: {}", status.message())); break; case OpCodes::Check: - parseCheckRequest(data, offset, len); + status = parseCheckRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, + fmt::format("parseCheckRequest: {}", status.message())); break; case OpCodes::Multi: - parseMultiRequest(data, offset, len); + status = parseMultiRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, + fmt::format("parseMultiRequest: {}", status.message())); break; case OpCodes::Reconfig: - parseReconfigRequest(data, offset, len); + status = parseReconfigRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseReconfigRequest: {}", status.message())); break; case OpCodes::SetWatches: - parseSetWatchesRequest(data, offset, len); + status = parseSetWatchesRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseSetWatchesRequest: {}", status.message())); break; case OpCodes::SetWatches2: - parseSetWatches2Request(data, offset, len); + status = parseSetWatches2Request(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseSetWatches2Request: {}", status.message())); break; case OpCodes::AddWatch: - parseAddWatchRequest(data, offset, len); + status = parseAddWatchRequest(data, offset, len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseAddWatchRequest: {}", status.message())); break; case OpCodes::CheckWatches: - parseXWatchesRequest(data, offset, len, OpCodes::CheckWatches); + status = parseXWatchesRequest(data, offset, len.value(), OpCodes::CheckWatches); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseXWatchesRequest (check watches): {}", status.message())); break; case OpCodes::RemoveWatches: - parseXWatchesRequest(data, offset, len, OpCodes::RemoveWatches); + status = parseXWatchesRequest(data, offset, len.value(), OpCodes::RemoveWatches); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseXWatchesRequest (remove watches): {}", status.message())); break; case OpCodes::GetEphemerals: - callbacks_.onGetEphemeralsRequest(pathOnlyRequest(data, offset, len)); + status = callbacks_.onGetEphemeralsRequest(pathOnlyRequest(data, offset, len.value())); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("onGetEphemeralsRequest: {}", status.message())); break; case OpCodes::GetAllChildrenNumber: - callbacks_.onGetAllChildrenNumberRequest(pathOnlyRequest(data, offset, len)); + status = callbacks_.onGetAllChildrenNumberRequest(pathOnlyRequest(data, offset, len.value())); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("onGetAllChildrenNumberRequest: {}", status.message())); break; case OpCodes::Close: callbacks_.onCloseRequest(); break; default: - throw EnvoyException(fmt::format("Unknown opcode: {}", enumToSignedInt(opcode))); + ENVOY_LOG(debug, "zookeeper_proxy: decodeOnData exception: unknown opcode {}", + enumToSignedInt(opcode)); + callbacks_.onDecodeError(); + return absl::nullopt; } - requests_by_xid_[xid] = {opcode, std::move(start_time)}; + requests_by_xid_[xid.value()] = {opcode, std::move(start_time)}; return opcode; } -absl::optional DecoderImpl::decodeOnWrite(Buffer::Instance& data, uint64_t& offset) { +absl::StatusOr> DecoderImpl::decodeOnWrite(Buffer::Instance& data, + uint64_t& offset) { ENVOY_LOG(trace, "zookeeper_proxy: decoding response with {} bytes at offset {}", data.length(), offset); // Check message length. - const int32_t len = helper_.peekInt32(data, offset); - ENVOY_LOG(trace, "zookeeper_proxy: decoding response with len {} at offset {}", len, offset); - ensureMinLength(len, XID_LENGTH + ZXID_LENGTH + INT_LENGTH); // xid + zxid + err - ensureMaxLength(len); + const absl::StatusOr len = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + len, fmt::format("peekInt32 for len: {}", len.status().message())); + + ENVOY_LOG(trace, "zookeeper_proxy: decoding response with len.value() {} at offset {}", + len.value(), offset); + + absl::Status status = + ensureMinLength(len.value(), XID_LENGTH + ZXID_LENGTH + INT_LENGTH); // xid + zxid + err + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("ensureMinLength: {}", status.message())); + + status = ensureMaxLength(len.value()); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("ensureMaxLength: {}", status.message())); + + const absl::StatusOr xid = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + xid, fmt::format("peekInt32 for xid: {}", xid.status().message())); - const auto xid = helper_.peekInt32(data, offset); - ENVOY_LOG(trace, "zookeeper_proxy: decoding response with xid {} at offset {}", xid, offset); - const auto xid_code = static_cast(xid); + ENVOY_LOG(trace, "zookeeper_proxy: decoding response with xid {} at offset {}", xid.value(), + offset); + const auto xid_code = static_cast(xid.value()); - std::chrono::milliseconds latency; + absl::StatusOr latency; OpCodes opcode; switch (xid_code) { @@ -201,321 +290,517 @@ absl::optional DecoderImpl::decodeOnWrite(Buffer::Instance& data, uint6 case XidCodes::AuthXid: ABSL_FALLTHROUGH_INTENDED; case XidCodes::SetWatchesXid: - latency = fetchControlRequestData(xid, opcode); + latency = fetchControlRequestData(xid.value(), opcode); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + latency, fmt::format("fetchControlRequestData: {}", latency.status().message())); break; case XidCodes::WatchXid: // WATCH_XID is generated by the server, no need to fetch opcode and latency here. break; default: - latency = fetchDataRequestData(xid, opcode); + latency = fetchDataRequestData(xid.value(), opcode); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + latency, fmt::format("fetchDataRequestData: {}", latency.status().message())); } // Connect responses are special, they have no full reply header // but just an XID with no zxid nor error fields like the ones // available for all other server generated messages. if (xid_code == XidCodes::ConnectXid) { - parseConnectResponse(data, offset, len, latency); + status = parseConnectResponse(data, offset, len.value(), latency.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, fmt::format("parseConnectResponse: {}", status.message())) return opcode; } // Control responses that aren't connect, with XIDs <= 0. - const auto zxid = helper_.peekInt64(data, offset); - const auto error = helper_.peekInt32(data, offset); - ENVOY_LOG(trace, "zookeeper_proxy: decoding response with zxid {} and error {} at offset {}", - zxid, error, offset); + const absl::StatusOr zxid = helper_.peekInt64(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + zxid, fmt::format("peekInt64 for zxid: {}", zxid.status().message())); + + const absl::StatusOr error = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + error, fmt::format("peekInt32 for error: {}", error.status().message())); + + ENVOY_LOG(trace, + "zookeeper_proxy: decoding response with zxid.value() {} and error {} at offset {}", + zxid.value(), error.value(), offset); + switch (xid_code) { case XidCodes::PingXid: - callbacks_.onResponse(OpCodes::Ping, xid, zxid, error, latency); + callbacks_.onResponse(OpCodes::Ping, xid.value(), zxid.value(), error.value(), latency.value()); return opcode; case XidCodes::AuthXid: - callbacks_.onResponse(OpCodes::SetAuth, xid, zxid, error, latency); + callbacks_.onResponse(OpCodes::SetAuth, xid.value(), zxid.value(), error.value(), + latency.value()); return opcode; case XidCodes::SetWatchesXid: - callbacks_.onResponse(OpCodes::SetWatches, xid, zxid, error, latency); + callbacks_.onResponse(OpCodes::SetWatches, xid.value(), zxid.value(), error.value(), + latency.value()); return opcode; case XidCodes::WatchXid: - parseWatchEvent(data, offset, len, zxid, error); + status = parseWatchEvent(data, offset, len.value(), zxid.value(), error.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, + fmt::format("parseWatchEvent: {}", status.message())); + return absl::nullopt; // WATCH_XID is generated by the server, it has no corresponding opcode. default: break; } - callbacks_.onResponse(opcode, xid, zxid, error, latency); - offset += (len - (XID_LENGTH + ZXID_LENGTH + INT_LENGTH)); + callbacks_.onResponse(opcode, xid.value(), zxid.value(), error.value(), latency.value()); + offset += (len.value() - (XID_LENGTH + ZXID_LENGTH + INT_LENGTH)); return opcode; } -void DecoderImpl::ensureMinLength(const int32_t len, const int32_t minlen) const { +absl::Status DecoderImpl::ensureMinLength(const int32_t len, const int32_t minlen) const { if (len < minlen) { - throw EnvoyException("Packet is too small"); + return absl::InvalidArgumentError("packet is too small"); } + return absl::OkStatus(); } -void DecoderImpl::ensureMaxLength(const int32_t len) const { +absl::Status DecoderImpl::ensureMaxLength(const int32_t len) const { if (static_cast(len) > max_packet_bytes_) { - throw EnvoyException("Packet is too big"); + return absl::InvalidArgumentError("packet is too big"); } + return absl::OkStatus(); } -void DecoderImpl::parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); +absl::Status DecoderImpl::parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + absl::Status status = + ensureMinLength(len, XID_LENGTH + ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); // Skip zxid, timeout, and session id. offset += ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH; // Skip password. - skipString(data, offset); + status = skipString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - const bool readonly = maybeReadBool(data, offset); + const absl::StatusOr readonly = maybeReadBool(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, + readonly.status().message()); - callbacks_.onConnect(readonly); -} + callbacks_.onConnect(readonly.value()); -void DecoderImpl::parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + INT_LENGTH + INT_LENGTH); + return absl::OkStatus(); +} +absl::Status DecoderImpl::parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + absl::Status status = + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + INT_LENGTH + INT_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); // Skip opcode + type. offset += OPCODE_LENGTH + INT_LENGTH; - const std::string scheme = helper_.peekString(data, offset); + + const absl::StatusOr scheme = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(scheme, scheme.status().message()); + // Skip credential. - skipString(data, offset); + status = skipString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - callbacks_.onAuthRequest(scheme); + callbacks_.onAuthRequest(scheme.value()); + + return absl::OkStatus(); } -void DecoderImpl::parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); +absl::Status DecoderImpl::parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + + const absl::StatusOr watch = helper_.peekBool(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, watch.status().message()); - const std::string path = helper_.peekString(data, offset); - const bool watch = helper_.peekBool(data, offset); + callbacks_.onGetDataRequest(path.value(), watch.value()); - callbacks_.onGetDataRequest(path, watch); + return absl::OkStatus(); } -void DecoderImpl::skipAcls(Buffer::Instance& data, uint64_t& offset) { - const int32_t count = helper_.peekInt32(data, offset); +absl::Status DecoderImpl::skipAcls(Buffer::Instance& data, uint64_t& offset) { + const absl::StatusOr count = helper_.peekInt32(data, offset); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(count, + fmt::format("skipAcls: {}", count.status().message())); - for (int i = 0; i < count; ++i) { + for (int i = 0; i < count.value(); ++i) { // Perms. - helper_.peekInt32(data, offset); + absl::StatusOr perms = helper_.peekInt32(data, offset); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(perms, + fmt::format("skipAcls: {}", perms.status().message())); // Skip scheme. - skipString(data, offset); + absl::Status status = skipString(data, offset); + ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status); // Skip cred. - skipString(data, offset); + status = skipString(data, offset); + ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status); } + + return absl::OkStatus(); } -void DecoderImpl::parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, - OpCodes opcode) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (4 * INT_LENGTH)); +absl::Status DecoderImpl::parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + OpCodes opcode) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (4 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - const std::string path = helper_.peekString(data, offset); + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); // Skip data. - skipString(data, offset); - skipAcls(data, offset); + status = skipString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + status = skipAcls(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + absl::StatusOr flag_data = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(flag_data, + flag_data.status().message()); - const CreateFlags flags = static_cast(helper_.peekInt32(data, offset)); - callbacks_.onCreateRequest(path, flags, opcode); + const CreateFlags flags = static_cast(flag_data.value()); + status = callbacks_.onCreateRequest(path.value(), flags, opcode); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + return absl::OkStatus(); } -void DecoderImpl::parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); +absl::Status DecoderImpl::parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - const std::string path = helper_.peekString(data, offset); // Skip data. - skipString(data, offset); + status = skipString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + // Ignore version. - helper_.peekInt32(data, offset); + absl::StatusOr version = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); - callbacks_.onSetRequest(path); + callbacks_.onSetRequest(path.value()); + + return absl::OkStatus(); } -void DecoderImpl::parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, - const bool two) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); +absl::Status DecoderImpl::parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len, const bool two) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + + const absl::StatusOr watch = helper_.peekBool(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, watch.status().message()); - const std::string path = helper_.peekString(data, offset); - const bool watch = helper_.peekBool(data, offset); + callbacks_.onGetChildrenRequest(path.value(), watch.value(), two); - callbacks_.onGetChildrenRequest(path, watch, two); + return absl::OkStatus(); } -void DecoderImpl::parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); +absl::Status DecoderImpl::parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - const std::string path = helper_.peekString(data, offset); - const int32_t version = helper_.peekInt32(data, offset); + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - callbacks_.onDeleteRequest(path, version); + const absl::StatusOr version = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); + + callbacks_.onDeleteRequest(path.value(), version.value()); + + return absl::OkStatus(); } -void DecoderImpl::parseExistsRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); +absl::Status DecoderImpl::parseExistsRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - const std::string path = helper_.peekString(data, offset); - const bool watch = helper_.peekBool(data, offset); + const absl::StatusOr watch = helper_.peekBool(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, watch.status().message()); - callbacks_.onExistsRequest(path, watch); + callbacks_.onExistsRequest(path.value(), watch.value()); + + return absl::OkStatus(); } -void DecoderImpl::parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); +absl::Status DecoderImpl::parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - const std::string path = helper_.peekString(data, offset); + callbacks_.onGetAclRequest(path.value()); - callbacks_.onGetAclRequest(path); + return absl::OkStatus(); } -void DecoderImpl::parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); +absl::Status DecoderImpl::parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + + status = skipAcls(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr version = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); - const std::string path = helper_.peekString(data, offset); - skipAcls(data, offset); - const int32_t version = helper_.peekInt32(data, offset); + callbacks_.onSetAclRequest(path.value(), version.value()); - callbacks_.onSetAclRequest(path, version); + return absl::OkStatus(); } -std::string DecoderImpl::pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); +absl::StatusOr DecoderImpl::pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + status, + fmt::format("zookeeper_proxy: path only request decoding exception {}", status.message())); + return helper_.peekString(data, offset); } -void DecoderImpl::parseCheckRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); +absl::Status DecoderImpl::parseCheckRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + + const absl::StatusOr version = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, version.status().message()); - const std::string path = helper_.peekString(data, offset); - const int32_t version = helper_.peekInt32(data, offset); + callbacks_.onCheckRequest(path.value(), version.value()); - callbacks_.onCheckRequest(path, version); + return absl::OkStatus(); } -void DecoderImpl::parseMultiRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { +absl::Status DecoderImpl::parseMultiRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { // Treat empty transactions as a decoding error, there should be at least 1 header. - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + MULTI_HEADER_LENGTH); + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + MULTI_HEADER_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); while (true) { - const int32_t opcode = helper_.peekInt32(data, offset); - const bool done = helper_.peekBool(data, offset); + const absl::StatusOr opcode = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(opcode, opcode.status().message()); + + const absl::StatusOr done = helper_.peekBool(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(done, done.status().message()); + // Ignore error field. - helper_.peekInt32(data, offset); + const absl::StatusOr error = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(error, error.status().message()); - if (done) { + if (done.value()) { break; } - switch (static_cast(opcode)) { + switch (static_cast(opcode.value())) { case OpCodes::Create: - parseCreateRequest(data, offset, len, OpCodes::Create); + status = parseCreateRequest(data, offset, len, OpCodes::Create); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); break; case OpCodes::SetData: - parseSetRequest(data, offset, len); + status = parseSetRequest(data, offset, len); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); break; case OpCodes::Check: - parseCheckRequest(data, offset, len); + status = parseCheckRequest(data, offset, len); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); break; case OpCodes::Delete: - parseDeleteRequest(data, offset, len); + status = parseDeleteRequest(data, offset, len); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); break; default: - throw EnvoyException(fmt::format("Unknown opcode within a transaction: {}", opcode)); + callbacks_.onDecodeError(); + return absl::InvalidArgumentError( + fmt::format("unknown opcode within a transaction: {}", opcode.value())); } } callbacks_.onMultiRequest(); + + return absl::OkStatus(); } -void DecoderImpl::parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH) + LONG_LENGTH); +absl::Status DecoderImpl::parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH) + LONG_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); // Skip joining. - skipString(data, offset); + status = skipString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + // Skip leaving. - skipString(data, offset); + status = skipString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); // Skip new members. - skipString(data, offset); + status = skipString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + // Read config id. - helper_.peekInt64(data, offset); + absl::StatusOr config_id = helper_.peekInt64(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(config_id, + config_id.status().message()); callbacks_.onReconfigRequest(); + + return absl::OkStatus(); } -void DecoderImpl::parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (3 * INT_LENGTH)); +absl::Status DecoderImpl::parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (3 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); // Ignore relative Zxid. - helper_.peekInt64(data, offset); + absl::StatusOr zxid = helper_.peekInt64(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, zxid.status().message()); + // Data watches. - skipStrings(data, offset); + status = skipStrings(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + // Exist watches. - skipStrings(data, offset); + status = skipStrings(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + // Child watches. - skipStrings(data, offset); + status = skipStrings(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); callbacks_.onSetWatchesRequest(); + + return absl::OkStatus(); } -void DecoderImpl::parseSetWatches2Request(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (5 * INT_LENGTH)); +absl::Status DecoderImpl::parseSetWatches2Request(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (5 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); // Ignore relative Zxid. - helper_.peekInt64(data, offset); + absl::StatusOr zxid = helper_.peekInt64(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, zxid.status().message()); + // Data watches. - skipStrings(data, offset); + status = skipStrings(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + // Exist watches. - skipStrings(data, offset); + status = skipStrings(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + // Child watches. - skipStrings(data, offset); + status = skipStrings(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + // Persistent watches. - skipStrings(data, offset); + status = skipStrings(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + // Persistent recursive watches. - skipStrings(data, offset); + status = skipStrings(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); callbacks_.onSetWatches2Request(); + + return absl::OkStatus(); } -void DecoderImpl::parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); +absl::Status DecoderImpl::parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - const std::string path = helper_.peekString(data, offset); - const int32_t mode = helper_.peekInt32(data, offset); + const absl::StatusOr mode = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(mode, mode.status().message()); - callbacks_.onAddWatchRequest(path, mode); + callbacks_.onAddWatchRequest(path.value(), mode.value()); + + return absl::OkStatus(); } -void DecoderImpl::parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, - OpCodes opcode) { - ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); +absl::Status DecoderImpl::parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len, OpCodes opcode) { + absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - const std::string path = helper_.peekString(data, offset); - const int32_t type = helper_.peekInt32(data, offset); + const absl::StatusOr watch_type = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch_type, + watch_type.status().message()); if (opcode == OpCodes::CheckWatches) { - callbacks_.onCheckWatchesRequest(path, type); + callbacks_.onCheckWatchesRequest(path.value(), watch_type.value()); } else { - callbacks_.onRemoveWatchesRequest(path, type); + callbacks_.onRemoveWatchesRequest(path.value(), watch_type.value()); } + + return absl::OkStatus(); } -void DecoderImpl::skipString(Buffer::Instance& data, uint64_t& offset) { - const int32_t slen = helper_.peekInt32(data, offset); - if (slen < 0) { +absl::Status DecoderImpl::skipString(Buffer::Instance& data, uint64_t& offset) { + const absl::StatusOr slen = helper_.peekInt32(data, offset); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(slen, + fmt::format("skipString: {}", slen.status().message())); + + if (slen.value() < 0) { ENVOY_LOG(trace, "zookeeper_proxy: decoding response with negative string length {} at offset {}", - slen, offset); - return; + slen.value(), offset); + return absl::OkStatus(); } - helper_.skip(slen, offset); + + helper_.skip(slen.value(), offset); + + return absl::OkStatus(); } -void DecoderImpl::skipStrings(Buffer::Instance& data, uint64_t& offset) { - const int32_t count = helper_.peekInt32(data, offset); +absl::Status DecoderImpl::skipStrings(Buffer::Instance& data, uint64_t& offset) { + const absl::StatusOr count = helper_.peekInt32(data, offset); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(count, + fmt::format("skipStrings: {}", count.status().message())); - for (int i = 0; i < count; ++i) { - skipString(data, offset); + for (int i = 0; i < count.value(); ++i) { + absl::Status status = skipString(data, offset); + ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status); } + + return absl::OkStatus(); } Network::FilterStatus DecoderImpl::onData(Buffer::Instance& data) { @@ -529,9 +814,14 @@ Network::FilterStatus DecoderImpl::onWrite(Buffer::Instance& data) { Network::FilterStatus DecoderImpl::decodeAndBuffer(Buffer::Instance& data, DecodeType dtype, Buffer::OwnedImpl& zk_filter_buffer) { const uint32_t zk_filter_buffer_len = zk_filter_buffer.length(); + absl::Status status; if (zk_filter_buffer_len == 0) { - decodeAndBufferHelper(data, dtype, zk_filter_buffer); + status = decodeAndBufferHelper(data, dtype, zk_filter_buffer); + if (!status.ok()) { + ENVOY_LOG(debug, "zookeeper_proxy: decodeAndBufferHelper exception: {}", status.message()); + } + return Network::FilterStatus::Continue; } @@ -539,52 +829,59 @@ Network::FilterStatus DecoderImpl::decodeAndBuffer(Buffer::Instance& data, Decod // Prepending ZooKeeper filter buffer to the current network filter buffer can help to generate // full packets. data.prepend(zk_filter_buffer); - decodeAndBufferHelper(data, dtype, zk_filter_buffer); + + status = decodeAndBufferHelper(data, dtype, zk_filter_buffer); + if (!status.ok()) { + ENVOY_LOG(debug, "zookeeper_proxy: decodeAndBufferHelper exception: {}", status.message()); + } + // Drain the prepended ZooKeeper filter buffer. data.drain(zk_filter_buffer_len); return Network::FilterStatus::Continue; } -void DecoderImpl::decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype, - Buffer::OwnedImpl& zk_filter_buffer) { +absl::Status DecoderImpl::decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype, + Buffer::OwnedImpl& zk_filter_buffer) { ASSERT(dtype == DecodeType::READ || dtype == DecodeType::WRITE); const uint32_t data_len = data.length(); uint64_t offset = 0; - uint32_t len = 0; + absl::StatusOr len = 0; + absl::Status status; // Boolean to check whether there is at least one full packet in the network filter buffer (to // which the ZooKeeper filter buffer is prepended). bool has_full_packets = false; while (offset < data_len) { - TRY_NEEDS_AUDIT { - // Peek packet length. - len = helper_.peekInt32(data, offset); - ensureMinLength(len, dtype == DecodeType::READ ? XID_LENGTH + INT_LENGTH - : XID_LENGTH + ZXID_LENGTH + INT_LENGTH); - ensureMaxLength(len); - offset += len; - if (offset <= data_len) { - has_full_packets = true; - } - } - END_TRY catch (const EnvoyException& e) { - ENVOY_LOG(debug, "zookeeper_proxy: decoding exception {}", e.what()); - callbacks_.onDecodeError(); - return; + // Peek packet length. + len = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK( + len, fmt::format("peekInt32 for len: {}", len.status().message())); + + status = ensureMinLength(len.value(), dtype == DecodeType::READ + ? XID_LENGTH + INT_LENGTH + : XID_LENGTH + ZXID_LENGTH + INT_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + status = ensureMaxLength(len.value()); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + offset += len.value(); + if (offset <= data_len) { + has_full_packets = true; } } if (offset == data_len) { decode(data, dtype, offset); - return; + return absl::OkStatus(); } ASSERT(offset > data_len); std::string temp_data; if (has_full_packets) { - offset -= INT_LENGTH + len; + offset -= INT_LENGTH + len.value(); ASSERT(offset < data_len); // Decode full packets. // offset here represents the length of all full packets. @@ -601,87 +898,112 @@ void DecoderImpl::decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype data.copyOut(0, data_len, temp_data.data()); zk_filter_buffer.add(temp_data.data(), temp_data.length()); } + + return absl::OkStatus(); } void DecoderImpl::decode(Buffer::Instance& data, DecodeType dtype, uint64_t full_packets_len) { uint64_t offset = 0; - TRY_NEEDS_AUDIT { - while (offset < full_packets_len) { - // Reset the helper's cursor, to ensure the current message stays within the - // allowed max length, even when it's different than the declared length - // by the message. - // - // Note: we need to keep two cursors — offset and helper_'s internal one — because - // a buffer may contain multiple messages, so offset is global while helper_'s - // internal cursor gets reset for each individual message. - helper_.reset(); - - const uint64_t current = offset; - absl::optional opcode; - switch (dtype) { - case DecodeType::READ: - opcode = decodeOnData(data, offset); - callbacks_.onRequestBytes(opcode, offset - current); + while (offset < full_packets_len) { + // Reset the helper's cursor, to ensure the current message stays within the + // allowed max length, even when it's different than the declared length + // by the message. + // + // Note: we need to keep two cursors — offset and helper_'s internal one — because + // a buffer may contain multiple messages, so offset is global while helper_'s + // internal cursor gets reset for each individual message. + helper_.reset(); + + const uint64_t current = offset; + absl::StatusOr> opcode; + switch (dtype) { + case DecodeType::READ: + opcode = decodeOnData(data, offset); + if (opcode.ok()) { + callbacks_.onRequestBytes(opcode.value(), offset - current); break; - case DecodeType::WRITE: - opcode = decodeOnWrite(data, offset); - callbacks_.onResponseBytes(opcode, offset - current); + } + ENVOY_LOG(debug, "zookeeper_proxy: decodeOnData exception: {}", opcode.status().message()); + return; + case DecodeType::WRITE: + opcode = decodeOnWrite(data, offset); + if (opcode.ok()) { + callbacks_.onResponseBytes(opcode.value(), offset - current); break; } + ENVOY_LOG(debug, "zookeeper_proxy: decodeOnWrite exception: {}", opcode.status().message()); + return; } } - END_TRY catch (const EnvoyException& e) { - ENVOY_LOG(debug, "zookeeper_proxy: decoding exception {}", e.what()); - callbacks_.onDecodeError(); - } } -void DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len, - const std::chrono::milliseconds latency) { - ensureMinLength(len, PROTOCOL_VERSION_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); +absl::Status DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& offset, + uint32_t len, + const std::chrono::milliseconds latency) { + absl::Status status = + ensureMinLength(len, PROTOCOL_VERSION_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); - const auto timeout = helper_.peekInt32(data, offset); + const absl::StatusOr timeout = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(timeout, timeout.status().message()); // Skip session id + password. offset += SESSION_LENGTH; - skipString(data, offset); + status = skipString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr readonly = maybeReadBool(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, + readonly.status().message()); - const bool readonly = maybeReadBool(data, offset); + callbacks_.onConnectResponse(0, timeout.value(), readonly.value(), latency); - callbacks_.onConnectResponse(0, timeout, readonly, latency); + return absl::OkStatus(); } -void DecoderImpl::parseWatchEvent(Buffer::Instance& data, uint64_t& offset, const uint32_t len, - const int64_t zxid, const int32_t error) { - ensureMinLength(len, SERVER_HEADER_LENGTH + (3 * INT_LENGTH)); +absl::Status DecoderImpl::parseWatchEvent(Buffer::Instance& data, uint64_t& offset, + const uint32_t len, const int64_t zxid, + const int32_t error) { + absl::Status status = ensureMinLength(len, SERVER_HEADER_LENGTH + (3 * INT_LENGTH)); + EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status); + + const absl::StatusOr event_type = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(event_type, + event_type.status().message()); + + const absl::StatusOr client_state = helper_.peekInt32(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(client_state, + client_state.status().message()); + + const absl::StatusOr path = helper_.peekString(data, offset); + EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); - const auto event_type = helper_.peekInt32(data, offset); - const auto client_state = helper_.peekInt32(data, offset); - const auto path = helper_.peekString(data, offset); + callbacks_.onWatchEvent(event_type.value(), client_state.value(), path.value(), zxid, error); - callbacks_.onWatchEvent(event_type, client_state, path, zxid, error); + return absl::OkStatus(); } -bool DecoderImpl::maybeReadBool(Buffer::Instance& data, uint64_t& offset) { +absl::StatusOr DecoderImpl::maybeReadBool(Buffer::Instance& data, uint64_t& offset) { if (data.length() >= offset + 1) { return helper_.peekBool(data, offset); } return false; } -std::chrono::milliseconds DecoderImpl::fetchControlRequestData(const int32_t xid, OpCodes& opcode) { +absl::StatusOr DecoderImpl::fetchControlRequestData(const int32_t xid, + OpCodes& opcode) { // Find the corresponding request queue for this XID. const auto it = control_requests_by_xid_.find(xid); // If this fails, it's either a server-side bug or a malformed packet. if (it == control_requests_by_xid_.end()) { - throw EnvoyException(fmt::format("control request xid {} not found", xid)); + return absl::InvalidArgumentError(fmt::format("control request xid {} not found", xid)); } std::queue& rq_queue = it->second; if (rq_queue.empty()) { - throw EnvoyException(fmt::format("control request queue for {} is empty", xid)); + return absl::InvalidArgumentError(fmt::format("control request queue for {} is empty", xid)); } std::chrono::milliseconds latency = std::chrono::duration_cast( @@ -692,13 +1014,14 @@ std::chrono::milliseconds DecoderImpl::fetchControlRequestData(const int32_t xid return latency; } -std::chrono::milliseconds DecoderImpl::fetchDataRequestData(const int32_t xid, OpCodes& opcode) { +absl::StatusOr DecoderImpl::fetchDataRequestData(const int32_t xid, + OpCodes& opcode) { // Find the corresponding request for this XID. const auto it = requests_by_xid_.find(xid); // If this fails, it's either a server-side bug or a malformed packet. if (it == requests_by_xid_.end()) { - throw EnvoyException(fmt::format("xid {} not found", xid)); + return absl::InvalidArgumentError(fmt::format("data request xid {} not found", xid)); } std::chrono::milliseconds latency = std::chrono::duration_cast( diff --git a/source/extensions/filters/network/zookeeper_proxy/decoder.h b/source/extensions/filters/network/zookeeper_proxy/decoder.h index 34ad9b4d32ff..8718a8d9d6b1 100644 --- a/source/extensions/filters/network/zookeeper_proxy/decoder.h +++ b/source/extensions/filters/network/zookeeper_proxy/decoder.h @@ -12,6 +12,7 @@ #include "source/extensions/filters/network/zookeeper_proxy/utils.h" #include "absl/container/node_hash_map.h" +#include "absl/status/statusor.h" namespace Envoy { namespace Extensions { @@ -85,16 +86,17 @@ class DecoderCallbacks { virtual void onPing() PURE; virtual void onAuthRequest(const std::string& scheme) PURE; virtual void onGetDataRequest(const std::string& path, bool watch) PURE; - virtual void onCreateRequest(const std::string& path, CreateFlags flags, OpCodes opcode) PURE; + virtual absl::Status onCreateRequest(const std::string& path, CreateFlags flags, + OpCodes opcode) PURE; virtual void onSetRequest(const std::string& path) PURE; virtual void onGetChildrenRequest(const std::string& path, bool watch, bool v2) PURE; - virtual void onGetEphemeralsRequest(const std::string& path) PURE; - virtual void onGetAllChildrenNumberRequest(const std::string& path) PURE; + virtual absl::Status onGetEphemeralsRequest(const absl::StatusOr& path) PURE; + virtual absl::Status onGetAllChildrenNumberRequest(const absl::StatusOr& path) PURE; virtual void onDeleteRequest(const std::string& path, int32_t version) PURE; virtual void onExistsRequest(const std::string& path, bool watch) PURE; virtual void onGetAclRequest(const std::string& path) PURE; virtual void onSetAclRequest(const std::string& path, int32_t version) PURE; - virtual void onSyncRequest(const std::string& path) PURE; + virtual absl::Status onSyncRequest(const absl::StatusOr& path) PURE; virtual void onCheckRequest(const std::string& path, int32_t version) PURE; virtual void onMultiRequest() PURE; virtual void onReconfigRequest() PURE; @@ -151,44 +153,50 @@ class DecoderImpl : public Decoder, Logger::Loggable { // (4) removes the prepended data. Network::FilterStatus decodeAndBuffer(Buffer::Instance& data, DecodeType dtype, Buffer::OwnedImpl& zk_filter_buffer); - void decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype, - Buffer::OwnedImpl& zk_filter_buffer); + absl::Status decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype, + Buffer::OwnedImpl& zk_filter_buffer); void decode(Buffer::Instance& data, DecodeType dtype, uint64_t full_packets_len); // decodeOnData and decodeOnWrite return ZooKeeper opcode or absl::nullopt. // absl::nullopt indicates WATCH_XID, which is generated by the server and has no corresponding // opcode. - absl::optional decodeOnData(Buffer::Instance& data, uint64_t& offset); - absl::optional decodeOnWrite(Buffer::Instance& data, uint64_t& offset); - void parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, OpCodes opcode); - void skipAcls(Buffer::Instance& data, uint64_t& offset); - void parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, bool two); - void parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseExistsRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseCheckRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseMultiRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseSetWatches2Request(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, OpCodes opcode); - void skipString(Buffer::Instance& data, uint64_t& offset); - void skipStrings(Buffer::Instance& data, uint64_t& offset); - void ensureMinLength(int32_t len, int32_t minlen) const; - void ensureMaxLength(int32_t len) const; - std::string pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); - void parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len, - const std::chrono::milliseconds latency); - void parseWatchEvent(Buffer::Instance& data, uint64_t& offset, uint32_t len, int64_t zxid, - int32_t error); - bool maybeReadBool(Buffer::Instance& data, uint64_t& offset); - std::chrono::milliseconds fetchControlRequestData(const int32_t xid, OpCodes& opcode); - std::chrono::milliseconds fetchDataRequestData(const int32_t xid, OpCodes& opcode); + absl::StatusOr> decodeOnData(Buffer::Instance& data, uint64_t& offset); + absl::StatusOr> decodeOnWrite(Buffer::Instance& data, uint64_t& offset); + absl::Status parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + OpCodes opcode); + absl::Status skipAcls(Buffer::Instance& data, uint64_t& offset); + absl::Status parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + bool two); + absl::Status parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseExistsRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseCheckRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseMultiRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseSetWatches2Request(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + absl::Status parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + OpCodes opcode); + absl::Status skipString(Buffer::Instance& data, uint64_t& offset); + absl::Status skipStrings(Buffer::Instance& data, uint64_t& offset); + absl::Status ensureMinLength(int32_t len, int32_t minlen) const; + absl::Status ensureMaxLength(int32_t len) const; + absl::StatusOr pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, + uint32_t len); + absl::Status parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len, + const std::chrono::milliseconds latency); + absl::Status parseWatchEvent(Buffer::Instance& data, uint64_t& offset, uint32_t len, int64_t zxid, + int32_t error); + absl::StatusOr maybeReadBool(Buffer::Instance& data, uint64_t& offset); + absl::StatusOr fetchControlRequestData(const int32_t xid, + OpCodes& opcode); + absl::StatusOr fetchDataRequestData(const int32_t xid, + OpCodes& opcode); DecoderCallbacks& callbacks_; const uint32_t max_packet_bytes_; diff --git a/source/extensions/filters/network/zookeeper_proxy/filter.cc b/source/extensions/filters/network/zookeeper_proxy/filter.cc index 013bea03b1e0..817fa14077d4 100644 --- a/source/extensions/filters/network/zookeeper_proxy/filter.cc +++ b/source/extensions/filters/network/zookeeper_proxy/filter.cc @@ -293,8 +293,8 @@ void ZooKeeperFilter::onGetDataRequest(const std::string& path, const bool watch setDynamicMetadata({{"opname", "getdata"}, {"path", path}, {"watch", watch ? "true" : "false"}}); } -void ZooKeeperFilter::onCreateRequest(const std::string& path, const CreateFlags flags, - const OpCodes opcode) { +absl::Status ZooKeeperFilter::onCreateRequest(const std::string& path, const CreateFlags flags, + const OpCodes opcode) { std::string opname; switch (opcode) { @@ -315,12 +315,14 @@ void ZooKeeperFilter::onCreateRequest(const std::string& path, const CreateFlags config_->stats_.createttl_rq_.inc(); break; default: - throw EnvoyException(fmt::format("Unknown opcode: {}", enumToSignedInt(opcode))); + return absl::InvalidArgumentError(fmt::format("unknown opcode: {}", enumToSignedInt(opcode))); break; } setDynamicMetadata( {{"opname", opname}, {"path", path}, {"create_type", createFlagsToString(flags)}}); + + return absl::OkStatus(); } void ZooKeeperFilter::onSetRequest(const std::string& path) { @@ -362,9 +364,13 @@ void ZooKeeperFilter::onSetAclRequest(const std::string& path, const int32_t ver setDynamicMetadata({{"opname", "setacl"}, {"path", path}, {"version", std::to_string(version)}}); } -void ZooKeeperFilter::onSyncRequest(const std::string& path) { +absl::Status ZooKeeperFilter::onSyncRequest(const absl::StatusOr& path) { + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + config_->stats_.sync_rq_.inc(); - setDynamicMetadata({{"opname", "sync"}, {"path", path}}); + setDynamicMetadata({{"opname", "sync"}, {"path", path.value()}}); + + return absl::OkStatus(); } void ZooKeeperFilter::onCheckRequest(const std::string&, const int32_t) { @@ -406,14 +412,23 @@ void ZooKeeperFilter::onAddWatchRequest(const std::string& path, const int32_t m setDynamicMetadata({{"opname", "addwatch"}, {"path", path}, {"mode", std::to_string(mode)}}); } -void ZooKeeperFilter::onGetEphemeralsRequest(const std::string& path) { +absl::Status ZooKeeperFilter::onGetEphemeralsRequest(const absl::StatusOr& path) { + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + config_->stats_.getephemerals_rq_.inc(); - setDynamicMetadata({{"opname", "getephemerals"}, {"path", path}}); + setDynamicMetadata({{"opname", "getephemerals"}, {"path", path.value()}}); + + return absl::OkStatus(); } -void ZooKeeperFilter::onGetAllChildrenNumberRequest(const std::string& path) { +absl::Status +ZooKeeperFilter::onGetAllChildrenNumberRequest(const absl::StatusOr& path) { + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, path.status().message()); + config_->stats_.getallchildrennumber_rq_.inc(); - setDynamicMetadata({{"opname", "getallchildrennumber"}, {"path", path}}); + setDynamicMetadata({{"opname", "getallchildrennumber"}, {"path", path.value()}}); + + return absl::OkStatus(); } void ZooKeeperFilter::onCloseRequest() { diff --git a/source/extensions/filters/network/zookeeper_proxy/filter.h b/source/extensions/filters/network/zookeeper_proxy/filter.h index 294d8a3119da..94f47506b211 100644 --- a/source/extensions/filters/network/zookeeper_proxy/filter.h +++ b/source/extensions/filters/network/zookeeper_proxy/filter.h @@ -17,6 +17,8 @@ #include "source/common/stats/symbol_table.h" #include "source/extensions/filters/network/zookeeper_proxy/decoder.h" +#include "absl/status/statusor.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { @@ -340,14 +342,14 @@ class ZooKeeperFilter : public Network::Filter, void onPing() override; void onAuthRequest(const std::string& scheme) override; void onGetDataRequest(const std::string& path, bool watch) override; - void onCreateRequest(const std::string& path, CreateFlags flags, OpCodes opcode) override; + absl::Status onCreateRequest(const std::string& path, CreateFlags flags, OpCodes opcode) override; void onSetRequest(const std::string& path) override; void onGetChildrenRequest(const std::string& path, bool watch, bool v2) override; void onDeleteRequest(const std::string& path, int32_t version) override; void onExistsRequest(const std::string& path, bool watch) override; void onGetAclRequest(const std::string& path) override; void onSetAclRequest(const std::string& path, int32_t version) override; - void onSyncRequest(const std::string& path) override; + absl::Status onSyncRequest(const absl::StatusOr& path) override; void onCheckRequest(const std::string& path, int32_t version) override; void onMultiRequest() override; void onReconfigRequest() override; @@ -356,8 +358,8 @@ class ZooKeeperFilter : public Network::Filter, void onAddWatchRequest(const std::string& path, const int32_t mode) override; void onCheckWatchesRequest(const std::string& path, int32_t type) override; void onRemoveWatchesRequest(const std::string& path, int32_t type) override; - void onGetEphemeralsRequest(const std::string& path) override; - void onGetAllChildrenNumberRequest(const std::string& path) override; + absl::Status onGetEphemeralsRequest(const absl::StatusOr& path) override; + absl::Status onGetAllChildrenNumberRequest(const absl::StatusOr& path) override; void onCloseRequest() override; void onResponseBytes(const absl::optional opcode, const uint64_t bytes) override; void onConnectResponse(int32_t proto_version, int32_t timeout, bool readonly, diff --git a/source/extensions/filters/network/zookeeper_proxy/utils.cc b/source/extensions/filters/network/zookeeper_proxy/utils.cc index cf1c77a8bf19..b16a6cdc163c 100644 --- a/source/extensions/filters/network/zookeeper_proxy/utils.cc +++ b/source/extensions/filters/network/zookeeper_proxy/utils.cc @@ -7,24 +7,27 @@ namespace Extensions { namespace NetworkFilters { namespace ZooKeeperProxy { -int32_t BufferHelper::peekInt32(Buffer::Instance& buffer, uint64_t& offset) { - ensureMaxLen(sizeof(int32_t)); +absl::StatusOr BufferHelper::peekInt32(Buffer::Instance& buffer, uint64_t& offset) { + absl::Status status = ensureMaxLen(sizeof(int32_t)); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, fmt::format("peekInt32: {}", status.message())); const int32_t val = buffer.peekBEInt(offset); offset += sizeof(int32_t); return val; } -int64_t BufferHelper::peekInt64(Buffer::Instance& buffer, uint64_t& offset) { - ensureMaxLen(sizeof(int64_t)); +absl::StatusOr BufferHelper::peekInt64(Buffer::Instance& buffer, uint64_t& offset) { + absl::Status status = ensureMaxLen(sizeof(int64_t)); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, fmt::format("peekInt64: {}", status.message())); const int64_t val = buffer.peekBEInt(offset); offset += sizeof(int64_t); return val; } -bool BufferHelper::peekBool(Buffer::Instance& buffer, uint64_t& offset) { - ensureMaxLen(1); +absl::StatusOr BufferHelper::peekBool(Buffer::Instance& buffer, uint64_t& offset) { + absl::Status status = ensureMaxLen(1); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, fmt::format("peekBool: {}", status.message())); const char byte = buffer.peekInt(offset); const bool val = static_cast(byte); @@ -32,24 +35,27 @@ bool BufferHelper::peekBool(Buffer::Instance& buffer, uint64_t& offset) { return val; } -std::string BufferHelper::peekString(Buffer::Instance& buffer, uint64_t& offset) { +absl::StatusOr BufferHelper::peekString(Buffer::Instance& buffer, uint64_t& offset) { std::string val; - const uint32_t len = peekInt32(buffer, offset); + const absl::StatusOr len = peekInt32(buffer, offset); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(len, + fmt::format("peekString: {}", len.status().message())); - if (len == 0) { + if (len.value() == 0) { return val; } - if (buffer.length() < (offset + len)) { - throw EnvoyException("peekString: buffer is smaller than string length"); + if (buffer.length() < (offset + len.value())) { + return absl::InvalidArgumentError("peekString: buffer is smaller than string length"); } - ensureMaxLen(len); + absl::Status status = ensureMaxLen(len.value()); + RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, fmt::format("peekString: {}", status.message())); - std::unique_ptr data(new char[len]); - buffer.copyOut(offset, len, data.get()); - val.assign(data.get(), len); - offset += len; + std::unique_ptr data(new char[len.value()]); + buffer.copyOut(offset, len.value(), data.get()); + val.assign(data.get(), len.value()); + offset += len.value(); return val; } @@ -59,12 +65,14 @@ void BufferHelper::skip(const uint32_t len, uint64_t& offset) { current_ += len; } -void BufferHelper::ensureMaxLen(const uint32_t size) { +absl::Status BufferHelper::ensureMaxLen(const uint32_t size) { current_ += size; if (current_ > max_len_) { - throw EnvoyException("read beyond max length"); + return absl::InvalidArgumentError("read beyond max length"); } + + return absl::OkStatus(); } } // namespace ZooKeeperProxy diff --git a/source/extensions/filters/network/zookeeper_proxy/utils.h b/source/extensions/filters/network/zookeeper_proxy/utils.h index dc699afaf548..76b43661dcfb 100644 --- a/source/extensions/filters/network/zookeeper_proxy/utils.h +++ b/source/extensions/filters/network/zookeeper_proxy/utils.h @@ -9,6 +9,8 @@ #include "source/common/common/byte_order.h" #include "source/common/common/logger.h" +#include "absl/status/statusor.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { @@ -17,8 +19,8 @@ namespace ZooKeeperProxy { /** * Helper for extracting ZooKeeper data from a buffer. * - * If at any point a peek is tried beyond max_len, an EnvoyException - * will be thrown. This is important to protect Envoy against malformed + * If at any point a peek is tried beyond max_len, an invalid argument error + * will be returned. This is important to protect Envoy against malformed * requests (e.g.: when the declared and actual length don't match). * * Note: ZooKeeper's protocol uses network byte ordering (big-endian). @@ -27,20 +29,41 @@ class BufferHelper : public Logger::Loggable { public: BufferHelper(uint32_t max_len) : max_len_(max_len) {} - int32_t peekInt32(Buffer::Instance& buffer, uint64_t& offset); - int64_t peekInt64(Buffer::Instance& buffer, uint64_t& offset); - std::string peekString(Buffer::Instance& buffer, uint64_t& offset); - bool peekBool(Buffer::Instance& buffer, uint64_t& offset); + absl::StatusOr peekInt32(Buffer::Instance& buffer, uint64_t& offset); + absl::StatusOr peekInt64(Buffer::Instance& buffer, uint64_t& offset); + absl::StatusOr peekString(Buffer::Instance& buffer, uint64_t& offset); + absl::StatusOr peekBool(Buffer::Instance& buffer, uint64_t& offset); void skip(uint32_t len, uint64_t& offset); void reset() { current_ = 0; } private: - void ensureMaxLen(uint32_t size); + absl::Status ensureMaxLen(uint32_t size); const uint32_t max_len_; uint32_t current_{}; }; +#define ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status) \ + if (!status.ok()) { \ + return status; \ + } + +#define EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status) \ + if (!status.ok()) { \ + callbacks_.onDecodeError(); \ + return status; \ + } + +#define RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, message) \ + if (!status.ok()) { \ + return absl::InvalidArgumentError(message); \ + } + +#define EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status, message) \ + if (!status.ok()) { \ + callbacks_.onDecodeError(); \ + return absl::InvalidArgumentError(message); \ + } } // namespace ZooKeeperProxy } // namespace NetworkFilters } // namespace Extensions