From 5a4672d785042a1ec951275af9a3ad81011b720f Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 4 Aug 2023 10:29:50 -0700 Subject: [PATCH] orca: update example and interop to use StateListener --- examples/features/orca/client/main.go | 16 +++++++++------ interop/orcalb.go | 29 +++++++++++++-------------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/examples/features/orca/client/main.go b/examples/features/orca/client/main.go index f295cfd3866a..40ab5a33ed42 100644 --- a/examples/features/orca/client/main.go +++ b/examples/features/orca/client/main.go @@ -106,7 +106,14 @@ func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error { } // Create one SubConn for the address and connect it. - sc, err := o.cc.NewSubConn(addrs, balancer.NewSubConnOptions{}) + var sc balancer.SubConn + sc, err := o.cc.NewSubConn(addrs, balancer.NewSubConnOptions{ + StateListener: func(scs balancer.SubConnState) { + if scs.ConnectivityState == connectivity.Ready { + o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &picker{sc}}) + } + }, + }) if err != nil { return fmt.Errorf("orcaLB: error creating SubConn: %v", err) } @@ -123,11 +130,8 @@ func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error { func (o *orcaLB) ResolverError(error) {} -func (o *orcaLB) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { - if scs.ConnectivityState == connectivity.Ready { - o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &picker{sc}}) - } -} +// TODO: unused; remove when no longer required. +func (o *orcaLB) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {} func (o *orcaLB) Close() {} diff --git a/interop/orcalb.go b/interop/orcalb.go index 28ea7524d7b7..de87c8828815 100644 --- a/interop/orcalb.go +++ b/interop/orcalb.go @@ -65,7 +65,7 @@ func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error { return fmt.Errorf("resolver produced no addresses") } var err error - o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{}) + o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{StateListener: o.updateSubConnState}) if err != nil { o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))}) return nil @@ -82,20 +82,20 @@ func (o *orcab) ResolverError(err error) { } } -func (o *orcab) UpdateSubConnState(sc balancer.SubConn, scState balancer.SubConnState) { - if o.sc != sc { - logger.Errorf("received subconn update for unknown subconn: %v vs %v", o.sc, sc) - return - } - switch scState.ConnectivityState { +func (o *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state) +} + +func (o *orcab) updateSubConnState(state balancer.SubConnState) { + switch state.ConnectivityState { case connectivity.Ready: - o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &scPicker{sc: sc, o: o}}) + o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &orcaPicker{o: o}}) case connectivity.TransientFailure: - o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", scState.ConnectionError))}) + o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", state.ConnectionError))}) case connectivity.Connecting: // Ignore; picker already set to "connecting". case connectivity.Idle: - sc.Connect() + o.sc.Connect() o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)}) case connectivity.Shutdown: // Ignore; we are closing but handle that in Close instead. @@ -113,12 +113,11 @@ func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) { o.report = r } -type scPicker struct { - sc balancer.SubConn - o *orcab +type orcaPicker struct { + o *orcab } -func (p *scPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { +func (p *orcaPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { doneCB := func(di balancer.DoneInfo) { if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil && (lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) { @@ -134,7 +133,7 @@ func (p *scPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { } } } - return balancer.PickResult{SubConn: p.sc, Done: doneCB}, nil + return balancer.PickResult{SubConn: p.o.sc, Done: doneCB}, nil } func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {