diff --git a/stats/opencensus/e2e_test.go b/stats/opencensus/e2e_test.go index 1f68675f0c1d..3b3c2bbbd908 100644 --- a/stats/opencensus/e2e_test.go +++ b/stats/opencensus/e2e_test.go @@ -237,6 +237,36 @@ func distributionDataLatencyCount(vi *viewInformation, countWant int64, wantTags return nil } +// waitForServerCompletedRPCs waits until both Unary and Streaming metric rows +// appear, in two separate rows, for server completed RPC's view. Returns an +// error if the Unary and Streaming metric are not found within the passed +// context's timeout. +func waitForServerCompletedRPCs(ctx context.Context) error { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + rows, err := view.RetrieveData("grpc.io/server/completed_rpcs") + if err != nil { + continue + } + unaryFound := false + streamingFound := false + for _, row := range rows { + for _, tag := range row.Tags { + if tag.Value == "grpc.testing.TestService/UnaryCall" { + unaryFound = true + break + } else if tag.Value == "grpc.testing.TestService/FullDuplexCall" { + streamingFound = true + break + } + } + if unaryFound && streamingFound { + return nil + } + } + } + return fmt.Errorf("timeout when waiting for Unary and Streaming rows to be present for \"grpc.io/server/completed_rpcs\"") +} + // TestAllMetricsOneFunction tests emitted metrics from gRPC. It registers all // the metrics provided by this package. It then configures a system with a gRPC // Client and gRPC server with the OpenCensus Dial and Server Option configured, @@ -987,10 +1017,13 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { }, }, } - // Unregister all the views. Unregistering a view causes a synchronous - // upload of any collected data for the view to any registered exporters. - // Thus, after this unregister call, the exporter has the data to make - // assertions on immediately. + // Server Side stats.End call happens asynchronously for both Unary and + // Streaming calls with respect to the RPC returning client side. Thus, add + // a sync point at the global view package level for these two rows to be + // recorded, which will be synchronously uploaded to exporters right after. + if err := waitForServerCompletedRPCs(ctx); err != nil { + t.Fatal(err) + } view.Unregister(allViews...) // Assert the expected emissions for each metric match the expected // emissions.