Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: add example for ORCA load reporting #6114

Merged
merged 3 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/examples_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ EXAMPLES=(
"features/metadata_interceptor"
"features/multiplex"
"features/name_resolving"
"features/orca"
"features/retry"
"features/unix_abstract"
)
Expand All @@ -75,6 +76,7 @@ declare -A SERVER_ARGS=(

declare -A CLIENT_ARGS=(
["features/unix_abstract"]="-addr $UNIX_ADDR"
["features/orca"]="-test=true"
["default"]="-addr localhost:$SERVER_PORT"
)

Expand Down Expand Up @@ -114,6 +116,7 @@ declare -A EXPECTED_SERVER_OUTPUT=(
["features/metadata_interceptor"]="key1 from metadata: "
["features/multiplex"]=":50051"
["features/name_resolving"]="serving on localhost:50051"
["features/orca"]="Server listening"
["features/retry"]="request succeeded count: 4"
["features/unix_abstract"]="serving on @abstract-unix-socket"
)
Expand All @@ -134,6 +137,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=(
["features/metadata_interceptor"]="BidiStreaming Echo: hello world"
["features/multiplex"]="Greeting: Hello multiplex"
["features/name_resolving"]="calling helloworld.Greeter/SayHello to \"example:///resolver.example.grpc.io\""
["features/orca"]="Per-call load report received: map\[db_queries:10\]"
["features/retry"]="UnaryEcho reply: message:\"Try and Success\""
["features/unix_abstract"]="calling echo.Echo/UnaryEcho to unix-abstract:abstract-unix-socket"
)
Expand Down
35 changes: 35 additions & 0 deletions examples/features/orca/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# ORCA Load Reporting

ORCA is a protocol for reporting load between servers and clients. This
example shows how to implement this from both the client and server side. For
more details, please see [gRFC
A51](https://github.com/grpc/proposal/blob/master/A51-custom-backend-metrics.md)

## Try it

```
go run server/main.go
```

```
go run client/main.go
```

## Explanation

The server is set up to report query cost metrics in its RPC handler. It also
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that it might be easier to read if you split this section into per-rpc metrics and oob metrics. And then talk about the APIs, what the server does and what the client does per section.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworked a bit.

registers an ORCA service that is used for out-of-band metrics. Both of these
pieces are optional and work independently. For per-RPC metrics to be
reported, two things are important: 1. using `orca.CallMetricsServerOption()`
when creating the server and 2. setting metrics in the method handlers by using
`orca.CallMetricRecorderFromContext()`. For out-of-band metrics, one simply
needs to create and register the reporter by using `orca.Register()` and set
metrics on the returned `orca.Service` using its methods.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do bulleted lists for this to make it easier to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote as part of above to not have #s anymore.


The client performs one RPC per second. All metrics are received via the LB
policy. Per-RPC metrics are available via the `Done()` callback returned from
the LB policy's picker. Out-of-band metrics are available via a listener
callback that is registered on `SubConn`s using `orca.RegisterOOBListener`.

The full ORCA API documentation is available here:
https://pkg.go.dev/google.golang.org/grpc/orca
150 changes: 150 additions & 0 deletions examples/features/orca/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary client is an example client.
package main

import (
"context"
"flag"
"fmt"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/orca"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
pb "google.golang.org/grpc/examples/features/proto/echo"
)

var addr = flag.String("addr", "localhost:50051", "the address to connect to")
var test = flag.Bool("test", false, "if set, only 1 RPC is performed before exiting")
easwars marked this conversation as resolved.
Show resolved Hide resolved

func main() {
flag.Parse()

// Set up a connection to the server. Configure to use our custom LB
// policy which will receive all the ORCA load reports.
conn, err := grpc.Dial(*addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"orca_example":{}}]}`),
)
Comment on lines +47 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Even though this is just an example, it would be nice if we don't split up lines this way. We should either have everything on a single line, or define a dial options slice to initialize the dial options.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This seems like purely personal preference. We do this in other places in our examples, too, like:

roundrobinConn, err := grpc.Dial(
fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), // This sets the initial balancing policy.
grpc.WithTransportCredentials(insecure.NewCredentials()),
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline & agreed to leave this.

if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewEchoClient(conn)

// Perform RPCs once per second.
ticker := time.NewTicker(time.Second)
for range ticker.C {
func() {
// Use an anonymous function to ensure context cancelation via defer.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "test echo message"}); err != nil {
log.Fatalf("Error from UnaryEcho call: %v", err)
}
}()
if *test {
return
}
}

}

// Register an ORCA load balancing policy to receive per-call metrics and
// out-of-band metrics.
func init() {
balancer.Register(orcaLBBuilder{})
}

type orcaLBBuilder struct{}

func (orcaLBBuilder) Name() string { return "orca_example" }
func (orcaLBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &orcaLB{cc: cc}
}

// orcaLB is an incomplete LB policy designed to show basic ORCA load reporting
// functionality. It collects per-call metrics in the `Done` callback returned
// by its picker, and it collects out-of-band metrics by registering a listener
// when its SubConn is created. It does not follow general LB policy best
// practices and makes assumptions about the simple test environment it is
// designed to run within.
type orcaLB struct {
cc balancer.ClientConn
}

func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error {
// We assume only one update, ever, containing exactly one address, given
// the use of the "passthrough" (default) name resolver.

addrs := ccs.ResolverState.Addresses
if len(addrs) != 1 {
return fmt.Errorf("orcaLB: expected 1 address; received: %v", addrs)
}

// Create one SubConn for the address and connect it.
sc, err := o.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
if err != nil {
return fmt.Errorf("orcaLB: error creating SubConn: %v", err)
}
sc.Connect()

// Register a simple ORCA OOB listener on the SubConn. We request a 1
// second report interval, but in this example the minimum interval is 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small clarification to say that the 3s interval is set by the server would be useful I feel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// seconds so reports will be sent that often.
orca.RegisterOOBListener(sc, orcaLis{}, orca.OOBListenerOptions{ReportInterval: time.Second})

return nil
}

func (o *orcaLB) ResolverError(error) {}

func (o *orcaLB) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
if scs.ConnectivityState == connectivity.Ready {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &picker{sc}})
}
}

func (o *orcaLB) Close() {}

type picker struct {
sc balancer.SubConn
}

func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{
SubConn: p.sc,
Done: func(di balancer.DoneInfo) {
fmt.Println("Per-call load report received:", di.ServerLoad.(*v3orcapb.OrcaLoadReport).GetRequestCost())
},
}, nil
}

type orcaLis struct{}
easwars marked this conversation as resolved.
Show resolved Hide resolved

func (orcaLis) OnLoadReport(lr *v3orcapb.OrcaLoadReport) {
fmt.Println("Out-of-band load report received:", lr)
}
92 changes: 92 additions & 0 deletions examples/features/orca/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary server is an example server.
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/status"

pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50051, "the port to serve on")

type server struct {
pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
// Report a sample cost for this query.
cmr := orca.CallMetricRecorderFromContext(ctx)
if cmr == nil {
return nil, status.Errorf(codes.Internal, "unable to retrieve call metric recorder (missing ORCA ServerOption?)")
}
cmr.SetRequestCost("db_queries", 10)

return &pb.EchoResponse{Message: in.Message}, nil
}

func main() {
flag.Parse()

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
fmt.Printf("Server listening at %v\n", lis.Addr())

// Create the gRPC server with the orca.CallMetricsServerOption() option,
// which will enable per-call metric recording.
s := grpc.NewServer(orca.CallMetricsServerOption())
pb.RegisterEchoServer(s, &server{})

// Register the orca service for out-of-band metric reporting, and set the
// maximum reporting frequency to once every 3 seconds. Note that, by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place where maximum is used. Should this also be minimum?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Higher frequency = faster = shorter interval. Confusing. I'll just remove any mention of "frequency".

// default, the shortest interval is 30 seconds, but 3 seconds is set via
// an internal-only option for illustration purposes only.
opts := orca.ServiceOptions{MinReportingInterval: 3 * time.Second}
internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&opts)
orcaSvc, err := orca.Register(s, opts)
if err != nil {
log.Fatalf("Failed to register ORCA service: %v", err)
}

// Simulate CPU utilization reporting.
go func() {
for {
orcaSvc.SetCPUUtilization(.5)
time.Sleep(2 * time.Second)
orcaSvc.SetCPUUtilization(.9)
time.Sleep(2 * time.Second)
}
}()

s.Serve(lis)
}
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module google.golang.org/grpc/examples
go 1.17

require (
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b
github.com/golang/protobuf v1.5.2
golang.org/x/oauth2 v0.4.0
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
Expand All @@ -16,7 +17,6 @@ require (
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b // indirect
github.com/envoyproxy/go-control-plane v0.10.3 // indirect
github.com/envoyproxy/protoc-gen-validate v0.9.1 // indirect
golang.org/x/net v0.8.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ var (
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()

// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
6 changes: 4 additions & 2 deletions orca/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/internal"
ointernal "google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/status"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
Expand All @@ -33,9 +34,10 @@ import (
)

func init() {
internal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
so.allowAnyMinReportingInterval = true
}
internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval
}

// minReportingInterval is the absolute minimum value supported for
Expand Down