Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp-observability: Update logging fields for GA and use custom BatchingSettings #9959

Merged
merged 3 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions gcp-observability/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ tasks.named("compileJava").configure {
}

dependencies {
def cloudLoggingVersion = '3.6.1'
def cloudLoggingVersion = '3.14.5'

annotationProcessor libraries.auto.value
api project(':grpc-api')


// TODO(dnvindhya): Prefer using our own libraries, update the dependencies
// in gradle/libs.versions instead
implementation project(':grpc-protobuf'),
project(':grpc-stub'),
project(':grpc-alts'),
Expand All @@ -35,12 +37,10 @@ dependencies {
libraries.opencensus.exporter.trace.stackdriver,
project(':grpc-xds'), // Align grpc versions
project(':grpc-services'), // Align grpc versions
libraries.animalsniffer.annotations, // Prefer our version
libraries.google.auth.credentials, // Prefer our version
libraries.protobuf.java.util, // Prefer our version
libraries.gson, // Prefer our version
libraries.perfmark.api, // Prefer our version
libraries.re2j, // Prefer our version
('com.google.protobuf:protobuf-java:3.21.12'),
('com.google.api.grpc:proto-google-common-protos:2.14.2'),
('com.google.auth:google-auth-library-oauth2-http:1.16.0'),
('io.opencensus:opencensus-api:0.31.1'),
('com.google.guava:guava:31.1-jre')

runtimeOnly libraries.opencensus.impl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Joiner;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.rpc.Code;
import io.grpc.Attributes;
import io.grpc.Deadline;
import io.grpc.Grpc;
Expand Down Expand Up @@ -182,7 +183,7 @@ void logTrailer(

PayloadBuilderHelper<Payload.Builder> pair =
createMetadataProto(metadata, maxHeaderBytes);
pair.payloadBuilder.setStatusCode(status.getCode().value());
pair.payloadBuilder.setStatusCode(Code.forNumber(status.getCode().value()));
String statusDescription = status.getDescription();
if (statusDescription != null) {
pair.payloadBuilder.setStatusMessage(statusDescription);
Expand Down Expand Up @@ -404,10 +405,10 @@ static Address socketAddressToProto(SocketAddress address) {
if (address instanceof InetSocketAddress) {
InetAddress inetAddress = ((InetSocketAddress) address).getAddress();
if (inetAddress instanceof Inet4Address) {
builder.setType(Address.Type.TYPE_IPV4)
builder.setType(Address.Type.IPV4)
.setAddress(InetAddressUtil.toAddrString(inetAddress));
} else if (inetAddress instanceof Inet6Address) {
builder.setType(Address.Type.TYPE_IPV6)
builder.setType(Address.Type.IPV6)
.setAddress(InetAddressUtil.toAddrString(inetAddress));
} else {
logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address);
Expand All @@ -417,7 +418,7 @@ static Address socketAddressToProto(SocketAddress address) {
} else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) {
// To avoid a compiled time dependency on grpc-netty, we check against the
// runtime class name.
builder.setType(Address.Type.TYPE_UNIX)
builder.setType(Address.Type.UNIX)
.setAddress(address.toString());
} else {
builder.setType(Address.Type.TYPE_UNKNOWN).setAddress(address.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowController;
import com.google.cloud.MonitoredResource;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
import com.google.cloud.logging.LoggingOptions;
import com.google.cloud.logging.Payload.JsonPayload;
import com.google.cloud.logging.Severity;
import com.google.cloud.logging.v2.stub.LoggingServiceV2StubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
Expand All @@ -41,6 +44,7 @@
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

/**
* Sink for Google Cloud Logging.
Expand Down Expand Up @@ -102,6 +106,7 @@ public void write(GrpcLogRecord logProto) {
if (servicesToExclude.contains(logProto.getServiceName())) {
return;
}
LogEntry grpcLogEntry = null;
try {
GrpcLogRecord.EventType eventType = logProto.getType();
// TODO(DNVindhya): make sure all (int, long) values are not displayed as double
Expand All @@ -117,13 +122,21 @@ public void write(GrpcLogRecord logProto) {
if (!customTags.isEmpty()) {
grpcLogEntryBuilder.setLabels(customTags);
}
LogEntry grpcLogEntry = grpcLogEntryBuilder.build();
grpcLogEntry = grpcLogEntryBuilder.build();
synchronized (this) {
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", eventType);
gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
}
} catch (FlowController.FlowControlRuntimeException e) {
String grpcLogEntryString = null;
if (grpcLogEntry != null) {
grpcLogEntryString = grpcLogEntry.toStructuredJsonString();
}
logger.log(Level.SEVERE, "Limit exceeded while writing log entry to cloud logging");
logger.log(Level.SEVERE, "Log entry = ", grpcLogEntryString);
} catch (Exception e) {
logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e);
throw new NullPointerException("Exception : " + e);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -132,6 +145,16 @@ Logging createLoggingClient() {
if (!Strings.isNullOrEmpty(projectId)) {
builder.setProjectId(projectId);
}
BatchingSettings loggingDefaultBatchingSettings = LoggingServiceV2StubSettings.newBuilder()
.writeLogEntriesSettings().getBatchingSettings();
// Custom batching settings
BatchingSettings grpcLoggingVBatchingSettings = loggingDefaultBatchingSettings.toBuilder()
.setDelayThreshold(Duration.ofSeconds(1L)).setFlowControlSettings(
loggingDefaultBatchingSettings.getFlowControlSettings().toBuilder()
.setMaxOutstandingRequestBytes(52428800L) //50 MiB
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build()).build();
builder.setBatchingSettings(grpcLoggingVBatchingSettings);
return builder.build().getService();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package grpc.observabilitylog.v1;

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/rpc/code.proto";

option java_multiple_files = true;
option java_package = "io.grpc.observabilitylog.v1";
Expand Down Expand Up @@ -97,7 +98,7 @@ message Payload {
// the RPC timeout value
google.protobuf.Duration timeout = 2;
// The gRPC status code
uint32 status_code = 3;
google.rpc.Code status_code = 3;
// The gRPC status message
string status_message = 4;
// The value of the grpc-status-details-bin metadata key, if any.
Expand All @@ -115,9 +116,9 @@ message Payload {
message Address {
enum Type {
TYPE_UNKNOWN = 0;
TYPE_IPV4 = 1; // in 1.2.3.4 form
TYPE_IPV6 = 2; // IPv6 canonical form (RFC5952 section 4)
TYPE_UNIX = 3; // UDS string
IPV4 = 1; // in 1.2.3.4 form
IPV6 = 2; // IPv6 canonical form (RFC5952 section 4)
UNIX = 3; // UDS string
}
Type type = 1;
string address = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import com.google.rpc.Code;
import io.grpc.Attributes;
import io.grpc.Grpc;
import io.grpc.Metadata;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void socketToProto_ipv4() throws Exception {
assertThat(LogHelper.socketAddressToProto(socketAddress))
.isEqualTo(Address
.newBuilder()
.setType(Address.Type.TYPE_IPV4)
.setType(Address.Type.IPV4)
.setAddress("127.0.0.1")
.setIpPort(12345)
.build());
Expand All @@ -109,7 +110,7 @@ public void socketToProto_ipv6() throws Exception {
assertThat(LogHelper.socketAddressToProto(socketAddress))
.isEqualTo(Address
.newBuilder()
.setType(Address.Type.TYPE_IPV6)
.setType(Address.Type.IPV6)
.setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required
.setIpPort(12345)
.build());
Expand Down Expand Up @@ -454,7 +455,7 @@ public void logTrailer() throws Exception {
builder.setPeer(LogHelper.socketAddressToProto(peer));
builder.setPayload(
builder.getPayload().toBuilder()
.setStatusCode(Status.INTERNAL.getCode().value())
.setStatusCode(Code.forNumber(Status.INTERNAL.getCode().value()))
.setStatusMessage("test description")
.build());
GrpcLogRecord base = builder.build();
Expand Down