Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: add example for ORCA load reporting #6114

Merged
merged 3 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/examples_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ EXAMPLES=(
"features/metadata_interceptor"
"features/multiplex"
"features/name_resolving"
"features/orca"
"features/retry"
"features/unix_abstract"
)
Expand All @@ -75,6 +76,7 @@ declare -A SERVER_ARGS=(

declare -A CLIENT_ARGS=(
["features/unix_abstract"]="-addr $UNIX_ADDR"
["features/orca"]="-test=true"
["default"]="-addr localhost:$SERVER_PORT"
)

Expand Down Expand Up @@ -114,6 +116,7 @@ declare -A EXPECTED_SERVER_OUTPUT=(
["features/metadata_interceptor"]="key1 from metadata: "
["features/multiplex"]=":50051"
["features/name_resolving"]="serving on localhost:50051"
["features/orca"]="Server listening"
["features/retry"]="request succeeded count: 4"
["features/unix_abstract"]="serving on @abstract-unix-socket"
)
Expand All @@ -134,6 +137,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=(
["features/metadata_interceptor"]="BidiStreaming Echo: hello world"
["features/multiplex"]="Greeting: Hello multiplex"
["features/name_resolving"]="calling helloworld.Greeter/SayHello to \"example:///resolver.example.grpc.io\""
["features/orca"]="Per-call load report received: map\[db_queries:10\]"
["features/retry"]="UnaryEcho reply: message:\"Try and Success\""
["features/unix_abstract"]="calling echo.Echo/UnaryEcho to unix-abstract:abstract-unix-socket"
)
Expand Down
48 changes: 48 additions & 0 deletions examples/features/orca/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# ORCA Load Reporting

ORCA is a protocol for reporting load between servers and clients. This
example shows how to implement this from both the client and server side. For
more details, please see [gRFC
A51](https://github.com/grpc/proposal/blob/master/A51-custom-backend-metrics.md)

## Try it

```
go run server/main.go
```

```
go run client/main.go
```

## Explanation

gRPC ORCA support provides two different ways to report load data to clients
from servers: out-of-band and per-RPC. Out-of-band metrics are reported
regularly at some interval on a stream, while per-RPC metrics are reported
along with the trailers at the end of a call. Both of these mechanisms are
optional and work independently.

The full ORCA API documentation is available here:
https://pkg.go.dev/google.golang.org/grpc/orca

### Out-of-band Metrics

The server registers an ORCA service that is used for out-of-band metrics. It
does this by using `orca.Register()` and then setting metrics on the returned
`orca.Service` using its methods.

The client receives out-of-band metrics via the LB policy. It receives
callbacks to a listener by registering the listener on a `SubConn` via
`orca.RegisterOOBListener`.

### Per-RPC Metrics

The server is set up to report query cost metrics in its RPC handler. For
per-RPC metrics to be reported, the gRPC server must be created with the
`orca.CallMetricsServerOption()` option, and metrics are set by calling methods
on the returned `orca.CallMetricRecorder` from
`orca.CallMetricRecorderFromContext()`.

The client performs one RPC per second. Per-RPC metrics are available for each
call via the `Done()` callback returned from the LB policy's picker.
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the other discussion here is about ORCA in general, the two types of reporting metrics, and the API in general. This line The client performs one RPC per second seems to be out of place. It seems to suddenly talk about the example.

Copy link
Member Author

@dfawley dfawley Mar 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. The whole section is about the example.

"The server is set up to report query cost metrics in its RPC handler. <explanation of how>"
"The client performs one RPC per second. <explanation of how it receives metrics related to those RPCs>"

Can you be more specific about how you would reword this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually looks good. Not sure what I was confused about.

153 changes: 153 additions & 0 deletions examples/features/orca/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary client is an example client.
package main

import (
"context"
"flag"
"fmt"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/orca"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
pb "google.golang.org/grpc/examples/features/proto/echo"
)

var addr = flag.String("addr", "localhost:50051", "the address to connect to")
var test = flag.Bool("test", false, "if set, only 1 RPC is performed before exiting")
easwars marked this conversation as resolved.
Show resolved Hide resolved

func main() {
flag.Parse()

// Set up a connection to the server. Configure to use our custom LB
// policy which will receive all the ORCA load reports.
conn, err := grpc.Dial(*addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"orca_example":{}}]}`),
)
Comment on lines +47 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Even though this is just an example, it would be nice if we don't split up lines this way. We should either have everything on a single line, or define a dial options slice to initialize the dial options.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This seems like purely personal preference. We do this in other places in our examples, too, like:

roundrobinConn, err := grpc.Dial(
fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), // This sets the initial balancing policy.
grpc.WithTransportCredentials(insecure.NewCredentials()),
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline & agreed to leave this.

if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewEchoClient(conn)

// Perform RPCs once per second.
ticker := time.NewTicker(time.Second)
for range ticker.C {
func() {
// Use an anonymous function to ensure context cancelation via defer.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "test echo message"}); err != nil {
log.Fatalf("Error from UnaryEcho call: %v", err)
}
}()
if *test {
return
}
}

}

// Register an ORCA load balancing policy to receive per-call metrics and
// out-of-band metrics.
func init() {
balancer.Register(orcaLBBuilder{})
}

type orcaLBBuilder struct{}

func (orcaLBBuilder) Name() string { return "orca_example" }
func (orcaLBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &orcaLB{cc: cc}
}

// orcaLB is an incomplete LB policy designed to show basic ORCA load reporting
// functionality. It collects per-call metrics in the `Done` callback returned
// by its picker, and it collects out-of-band metrics by registering a listener
// when its SubConn is created. It does not follow general LB policy best
// practices and makes assumptions about the simple test environment it is
// designed to run within.
type orcaLB struct {
cc balancer.ClientConn
}

func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error {
// We assume only one update, ever, containing exactly one address, given
// the use of the "passthrough" (default) name resolver.

addrs := ccs.ResolverState.Addresses
if len(addrs) != 1 {
return fmt.Errorf("orcaLB: expected 1 address; received: %v", addrs)
}

// Create one SubConn for the address and connect it.
sc, err := o.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
if err != nil {
return fmt.Errorf("orcaLB: error creating SubConn: %v", err)
}
sc.Connect()

// Register a simple ORCA OOB listener on the SubConn. We request a 1
// second report interval, but in this example the server indicated the
// minimum interval it will allow is 3 seconds, so reports will only be
// sent that often.
orca.RegisterOOBListener(sc, orcaLis{}, orca.OOBListenerOptions{ReportInterval: time.Second})

return nil
}

func (o *orcaLB) ResolverError(error) {}

func (o *orcaLB) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
if scs.ConnectivityState == connectivity.Ready {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &picker{sc}})
}
}

func (o *orcaLB) Close() {}

type picker struct {
sc balancer.SubConn
}

func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{
SubConn: p.sc,
Done: func(di balancer.DoneInfo) {
fmt.Println("Per-call load report received:", di.ServerLoad.(*v3orcapb.OrcaLoadReport).GetRequestCost())
},
}, nil
}

// orcaLis is the out-of-band load report listener that we pass to
// orca.RegisterOOBListener to receive periodic load report information.
type orcaLis struct{}
easwars marked this conversation as resolved.
Show resolved Hide resolved

func (orcaLis) OnLoadReport(lr *v3orcapb.OrcaLoadReport) {
fmt.Println("Out-of-band load report received:", lr)
}
92 changes: 92 additions & 0 deletions examples/features/orca/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary server is an example server.
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/status"

pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50051, "the port to serve on")

type server struct {
pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
// Report a sample cost for this query.
cmr := orca.CallMetricRecorderFromContext(ctx)
if cmr == nil {
return nil, status.Errorf(codes.Internal, "unable to retrieve call metric recorder (missing ORCA ServerOption?)")
}
cmr.SetRequestCost("db_queries", 10)

return &pb.EchoResponse{Message: in.Message}, nil
}

func main() {
flag.Parse()

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
fmt.Printf("Server listening at %v\n", lis.Addr())

// Create the gRPC server with the orca.CallMetricsServerOption() option,
// which will enable per-call metric recording.
s := grpc.NewServer(orca.CallMetricsServerOption())
pb.RegisterEchoServer(s, &server{})

// Register the orca service for out-of-band metric reporting, and set the
// minimum reporting interval to 3 seconds. Note that, by default, the
// minimum interval must be at least 30 seconds, but 3 seconds is set via
// an internal-only option for illustration purposes only.
opts := orca.ServiceOptions{MinReportingInterval: 3 * time.Second}
internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&opts)
orcaSvc, err := orca.Register(s, opts)
if err != nil {
log.Fatalf("Failed to register ORCA service: %v", err)
}

// Simulate CPU utilization reporting.
go func() {
for {
orcaSvc.SetCPUUtilization(.5)
time.Sleep(2 * time.Second)
orcaSvc.SetCPUUtilization(.9)
time.Sleep(2 * time.Second)
}
}()

s.Serve(lis)
}
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module google.golang.org/grpc/examples
go 1.17

require (
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b
github.com/golang/protobuf v1.5.2
golang.org/x/oauth2 v0.4.0
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
Expand All @@ -16,7 +17,6 @@ require (
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b // indirect
github.com/envoyproxy/go-control-plane v0.10.3 // indirect
github.com/envoyproxy/protoc-gen-validate v0.9.1 // indirect
golang.org/x/net v0.8.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ var (
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()

// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
6 changes: 4 additions & 2 deletions orca/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/internal"
ointernal "google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/status"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
Expand All @@ -33,9 +34,10 @@ import (
)

func init() {
internal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
so.allowAnyMinReportingInterval = true
}
internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval
}

// minReportingInterval is the absolute minimum value supported for
Expand Down