Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick observability changes from master to v1.50.x and update version to 1.50.1 #5722

Merged
merged 5 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions default_dial_option_server_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {

// Set and check the DialOptions
opts := []DialOption{WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials())}
internal.AddExtraDialOptions.(func(opt ...DialOption))(opts...)
internal.AddGlobalDialOptions.(func(opt ...DialOption))(opts...)
for i, opt := range opts {
if extraDialOptions[i] != opt {
t.Fatalf("Unexpected extra dial option at index %d: %v != %v", i, extraDialOptions[i], opt)
Expand All @@ -52,7 +52,7 @@ func (s) TestAddExtraDialOptions(t *testing.T) {
cc.Close()
}

internal.ClearExtraDialOptions()
internal.ClearGlobalDialOptions()
if len(extraDialOptions) != 0 {
t.Fatalf("Unexpected len of extraDialOptions: %d != 0", len(extraDialOptions))
}
Expand All @@ -62,7 +62,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
const maxRecvSize = 998765
// Set and check the ServerOptions
opts := []ServerOption{Creds(insecure.NewCredentials()), MaxRecvMsgSize(maxRecvSize)}
internal.AddExtraServerOptions.(func(opt ...ServerOption))(opts...)
internal.AddGlobalServerOptions.(func(opt ...ServerOption))(opts...)
for i, opt := range opts {
if extraServerOptions[i] != opt {
t.Fatalf("Unexpected extra server option at index %d: %v != %v", i, extraServerOptions[i], opt)
Expand All @@ -75,7 +75,7 @@ func (s) TestAddExtraServerOptions(t *testing.T) {
t.Fatalf("Unexpected s.opts.maxReceiveMessageSize: %d != %d", s.opts.maxReceiveMessageSize, maxRecvSize)
}

internal.ClearExtraServerOptions()
internal.ClearGlobalServerOptions()
if len(extraServerOptions) != 0 {
t.Fatalf("Unexpected len of extraServerOptions: %d != 0", len(extraServerOptions))
}
Expand Down
15 changes: 13 additions & 2 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
)

func init() {
internal.AddExtraDialOptions = func(opt ...DialOption) {
internal.AddGlobalDialOptions = func(opt ...DialOption) {
extraDialOptions = append(extraDialOptions, opt...)
}
internal.ClearExtraDialOptions = func() {
internal.ClearGlobalDialOptions = func() {
extraDialOptions = nil
}
internal.WithBinaryLogger = withBinaryLogger
}

// dialOptions configure a Dial call. dialOptions are set by the DialOption
Expand All @@ -61,6 +63,7 @@ type dialOptions struct {
timeout time.Duration
scChan <-chan ServiceConfig
authority string
binaryLogger binarylog.Logger
copts transport.ConnectOptions
callOptions []CallOption
channelzParentID *channelz.Identifier
Expand Down Expand Up @@ -401,6 +404,14 @@ func WithStatsHandler(h stats.Handler) DialOption {
})
}

// withBinaryLogger returns a DialOption that specifies the binary logger for
// this ClientConn.
func withBinaryLogger(bl binarylog.Logger) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.binaryLogger = bl
})
}

// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
// non-temporary dial errors. If f is true, and dialer returns a non-temporary
// error, gRPC will fail the connection to the network address and won't try to
Expand Down
246 changes: 163 additions & 83 deletions gcp/observability/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,86 +21,23 @@ package observability
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"regexp"

gcplogging "cloud.google.com/go/logging"
"golang.org/x/oauth2/google"
"google.golang.org/grpc/internal/envconfig"
)

const (
envObservabilityConfig = "GRPC_CONFIG_OBSERVABILITY"
envObservabilityConfigJSON = "GRPC_CONFIG_OBSERVABILITY_JSON"
envProjectID = "GOOGLE_CLOUD_PROJECT"
logFilterPatternRegexpStr = `^([\w./]+)/((?:\w+)|[*])$`
envProjectID = "GOOGLE_CLOUD_PROJECT"
methodStringRegexpStr = `^([\w./]+)/((?:\w+)|[*])$`
)

var logFilterPatternRegexp = regexp.MustCompile(logFilterPatternRegexpStr)

// logFilter represents a method logging configuration.
type logFilter struct {
// Pattern is a string which can select a group of method names. By
// default, the Pattern is an empty string, matching no methods.
//
// Only "*" Wildcard is accepted for Pattern. A Pattern is in the form
// of <service>/<method> or just a character "*" .
//
// If the Pattern is "*", it specifies the defaults for all the
// services; If the Pattern is <service>/*, it specifies the defaults
// for all methods in the specified service <service>; If the Pattern is
// */<method>, this is not supported.
//
// Examples:
// - "Foo/Bar" selects only the method "Bar" from service "Foo"
// - "Foo/*" selects all methods from service "Foo"
// - "*" selects all methods from all services.
Pattern string `json:"pattern,omitempty"`
// HeaderBytes is the number of bytes of each header to log. If the size of
// the header is greater than the defined limit, content past the limit will
// be truncated. The default value is 0.
HeaderBytes int32 `json:"header_bytes,omitempty"`
// MessageBytes is the number of bytes of each message to log. If the size
// of the message is greater than the defined limit, content pass the limit
// will be truncated. The default value is 0.
MessageBytes int32 `json:"message_bytes,omitempty"`
}

// config is configuration for observability behaviors. By default, no
// configuration is required for tracing/metrics/logging to function. This
// config captures the most common knobs for gRPC users. It's always possible to
// override with explicit config in code.
type config struct {
// EnableCloudTrace represents whether the tracing data upload to
// CloudTrace should be enabled or not.
EnableCloudTrace bool `json:"enable_cloud_trace,omitempty"`
// EnableCloudMonitoring represents whether the metrics data upload to
// CloudMonitoring should be enabled or not.
EnableCloudMonitoring bool `json:"enable_cloud_monitoring,omitempty"`
// EnableCloudLogging represents Whether the logging data upload to
// CloudLogging should be enabled or not.
EnableCloudLogging bool `json:"enable_cloud_logging,omitempty"`
// DestinationProjectID is the destination GCP project identifier for the
// uploading log entries. If empty, the gRPC Observability plugin will
// attempt to fetch the project_id from the GCP environment variables, or
// from the default credentials.
DestinationProjectID string `json:"destination_project_id,omitempty"`
// LogFilters is a list of method config. The order matters here - the first
// Pattern which matches the current method will apply the associated config
// options in the logFilter. Any other logFilter that also matches that
// comes later will be ignored. So a logFilter of "*/*" should appear last
// in this list.
LogFilters []logFilter `json:"log_filters,omitempty"`
// GlobalTraceSamplingRate is the global setting that controls the
// probability of a RPC being traced. For example, 0.05 means there is a 5%
// chance for a RPC to be traced, 1.0 means trace every call, 0 means don’t
// start new traces.
GlobalTraceSamplingRate float64 `json:"global_trace_sampling_rate,omitempty"`
// CustomTags a list of custom tags that will be attached to every log
// entry.
CustomTags map[string]string `json:"custom_tags,omitempty"`
}
var methodStringRegexp = regexp.MustCompile(methodStringRegexpStr)

// fetchDefaultProjectID fetches the default GCP project id from environment.
func fetchDefaultProjectID(ctx context.Context) string {
Expand All @@ -123,14 +60,34 @@ func fetchDefaultProjectID(ctx context.Context) string {
return credentials.ProjectID
}

func validateFilters(config *config) error {
for _, filter := range config.LogFilters {
if filter.Pattern == "*" {
func validateLogEventMethod(methods []string, exclude bool) error {
for _, method := range methods {
if method == "*" {
if exclude {
return errors.New("cannot have exclude and a '*' wildcard")
}
continue
}
match := logFilterPatternRegexp.FindStringSubmatch(filter.Pattern)
match := methodStringRegexp.FindStringSubmatch(method)
if match == nil {
return fmt.Errorf("invalid log filter Pattern: %v", filter.Pattern)
return fmt.Errorf("invalid method string: %v", method)
}
}
return nil
}

func validateLoggingEvents(config *config) error {
if config.CloudLogging == nil {
return nil
}
for _, clientRPCEvent := range config.CloudLogging.ClientRPCEvents {
if err := validateLogEventMethod(clientRPCEvent.Methods, clientRPCEvent.Exclude); err != nil {
return fmt.Errorf("error in clientRPCEvent method: %v", err)
}
}
for _, serverRPCEvent := range config.CloudLogging.ServerRPCEvents {
if err := validateLogEventMethod(serverRPCEvent.Methods, serverRPCEvent.Exclude); err != nil {
return fmt.Errorf("error in serverRPCEvent method: %v", err)
}
}
return nil
Expand All @@ -144,38 +101,161 @@ func unmarshalAndVerifyConfig(rawJSON json.RawMessage) (*config, error) {
if err := json.Unmarshal(rawJSON, &config); err != nil {
return nil, fmt.Errorf("error parsing observability config: %v", err)
}
if err := validateFilters(&config); err != nil {
if err := validateLoggingEvents(&config); err != nil {
return nil, fmt.Errorf("error parsing observability config: %v", err)
}
if config.GlobalTraceSamplingRate > 1 || config.GlobalTraceSamplingRate < 0 {
return nil, fmt.Errorf("error parsing observability config: invalid global trace sampling rate %v", config.GlobalTraceSamplingRate)
if config.CloudTrace != nil && (config.CloudTrace.SamplingRate > 1 || config.CloudTrace.SamplingRate < 0) {
return nil, fmt.Errorf("error parsing observability config: invalid cloud trace sampling rate %v", config.CloudTrace.SamplingRate)
}
logger.Infof("Parsed ObservabilityConfig: %+v", &config)
return &config, nil
}

func parseObservabilityConfig() (*config, error) {
if fileSystemPath := os.Getenv(envObservabilityConfigJSON); fileSystemPath != "" {
content, err := ioutil.ReadFile(fileSystemPath) // TODO: Switch to os.ReadFile once dropped support for go 1.15
if f := envconfig.ObservabilityConfigFile; f != "" {
if envconfig.ObservabilityConfig != "" {
logger.Warning("Ignoring GRPC_GCP_OBSERVABILITY_CONFIG and using GRPC_GCP_OBSERVABILITY_CONFIG_FILE contents.")
}
content, err := ioutil.ReadFile(f) // TODO: Switch to os.ReadFile once dropped support for go 1.15
if err != nil {
return nil, fmt.Errorf("error reading observability configuration file %q: %v", fileSystemPath, err)
return nil, fmt.Errorf("error reading observability configuration file %q: %v", f, err)
}
return unmarshalAndVerifyConfig(content)
} else if content := os.Getenv(envObservabilityConfig); content != "" {
return unmarshalAndVerifyConfig([]byte(content))
} else if envconfig.ObservabilityConfig != "" {
return unmarshalAndVerifyConfig([]byte(envconfig.ObservabilityConfig))
}
// If the ENV var doesn't exist, do nothing
return nil, nil
}

func ensureProjectIDInObservabilityConfig(ctx context.Context, config *config) error {
if config.DestinationProjectID == "" {
if config.ProjectID == "" {
// Try to fetch the GCP project id
projectID := fetchDefaultProjectID(ctx)
if projectID == "" {
return fmt.Errorf("empty destination project ID")
}
config.DestinationProjectID = projectID
config.ProjectID = projectID
}
return nil
}

type clientRPCEvents struct {
// Methods is a list of strings which can select a group of methods. By
// default, the list is empty, matching no methods.
//
// The value of the method is in the form of <service>/<method>.
//
// "*" is accepted as a wildcard for:
// 1. The method name. If the value is <service>/*, it matches all
// methods in the specified service.
// 2. The whole value of the field which matches any <service>/<method>.
// It’s not supported when Exclude is true.
// 3. The * wildcard cannot be used on the service name independently,
// */<method> is not supported.
//
// The service name, when specified, must be the fully qualified service
// name, including the package name.
//
// Examples:
// 1."goo.Foo/Bar" selects only the method "Bar" from service "goo.Foo",
// here “goo” is the package name.
// 2."goo.Foo/*" selects all methods from service "goo.Foo"
// 3. "*" selects all methods from all services.
Methods []string `json:"methods,omitempty"`
// Exclude represents whether the methods denoted by Methods should be
// excluded from logging. The default value is false, meaning the methods
// denoted by Methods are included in the logging. If Exclude is true, the
// wildcard `*` cannot be used as value of an entry in Methods.
Exclude bool `json:"exclude,omitempty"`
// MaxMetadataBytes is the maximum number of bytes of each header to log. If
// the size of the metadata is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMetadataBytes int `json:"max_metadata_bytes"`
// MaxMessageBytes is the maximum number of bytes of each message to log. If
// the size of the message is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMessageBytes int `json:"max_message_bytes"`
}

type serverRPCEvents struct {
// Methods is a list of strings which can select a group of methods. By
// default, the list is empty, matching no methods.
//
// The value of the method is in the form of <service>/<method>.
//
// "*" is accepted as a wildcard for:
// 1. The method name. If the value is <service>/*, it matches all
// methods in the specified service.
// 2. The whole value of the field which matches any <service>/<method>.
// It’s not supported when Exclude is true.
// 3. The * wildcard cannot be used on the service name independently,
// */<method> is not supported.
//
// The service name, when specified, must be the fully qualified service
// name, including the package name.
//
// Examples:
// 1."goo.Foo/Bar" selects only the method "Bar" from service "goo.Foo",
// here “goo” is the package name.
// 2."goo.Foo/*" selects all methods from service "goo.Foo"
// 3. "*" selects all methods from all services.
Methods []string `json:"methods,omitempty"`
// Exclude represents whether the methods denoted by Methods should be
// excluded from logging. The default value is false, meaning the methods
// denoted by Methods are included in the logging. If Exclude is true, the
// wildcard `*` cannot be used as value of an entry in Methods.
Exclude bool `json:"exclude,omitempty"`
// MaxMetadataBytes is the maximum number of bytes of each header to log. If
// the size of the metadata is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMetadataBytes int `json:"max_metadata_bytes"`
// MaxMessageBytes is the maximum number of bytes of each message to log. If
// the size of the message is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMessageBytes int `json:"max_message_bytes"`
}

type cloudLogging struct {
// ClientRPCEvents represents the configuration for outgoing RPC's from the
// binary. The client_rpc_events configs are evaluated in text order, the
// first one matched is used. If an RPC doesn't match an entry, it will
// continue on to the next entry in the list.
ClientRPCEvents []clientRPCEvents `json:"client_rpc_events,omitempty"`

// ServerRPCEvents represents the configuration for incoming RPC's to the
// binary. The server_rpc_events configs are evaluated in text order, the
// first one matched is used. If an RPC doesn't match an entry, it will
// continue on to the next entry in the list.
ServerRPCEvents []serverRPCEvents `json:"server_rpc_events,omitempty"`
}

type cloudMonitoring struct{}

type cloudTrace struct {
// SamplingRate is the global setting that controls the probability of a RPC
// being traced. For example, 0.05 means there is a 5% chance for a RPC to
// be traced, 1.0 means trace every call, 0 means don’t start new traces. By
// default, the sampling_rate is 0.
SamplingRate float64 `json:"sampling_rate,omitempty"`
}

type config struct {
// ProjectID is the destination GCP project identifier for uploading log
// entries. If empty, the gRPC Observability plugin will attempt to fetch
// the project_id from the GCP environment variables, or from the default
// credentials. If not found, the observability init functions will return
// an error.
ProjectID string `json:"project_id,omitempty"`
// CloudLogging defines the logging options. If not present, logging is disabled.
CloudLogging *cloudLogging `json:"cloud_logging,omitempty"`
// CloudMonitoring determines whether or not metrics are enabled based on
// whether it is present or not. If present, monitoring will be enabled, if
// not present, monitoring is disabled.
CloudMonitoring *cloudMonitoring `json:"cloud_monitoring,omitempty"`
// CloudTrace defines the tracing options. When present, tracing is enabled
// with default configurations. When absent, the tracing is disabled.
CloudTrace *cloudTrace `json:"cloud_trace,omitempty"`
// Labels are applied to cloud logging, monitoring, and trace.
Labels map[string]string `json:"labels,omitempty"`
}
Loading