Skip to content

Commit

Permalink
Merge pull request #77 from Jiawei0227/migrated_metric
Browse files Browse the repository at this point in the history
Add Migrated metrics to the connection record metrics client side
  • Loading branch information
k8s-ci-robot authored Feb 4, 2021
2 parents ffd4067 + 996f59d commit e9aafaf
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 93 deletions.
33 changes: 32 additions & 1 deletion connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ type ExtendedCSIMetricsManager struct {
metrics.CSIMetricsManager
}

type AdditionalInfo struct {
Migrated string
}
type AdditionalInfoKeyType struct{}

var AdditionalInfoKey AdditionalInfoKeyType

// RecordMetricsClientInterceptor is a gPRC unary interceptor for recording metrics for CSI operations
// in a gRPC client.
func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor(
Expand All @@ -203,11 +210,35 @@ func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor(
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
duration := time.Since(start)
cmm.RecordMetrics(

var cmmBase metrics.CSIMetricsManager
cmmBase = cmm
if cmm.HaveAdditionalLabel(metrics.LabelMigrated) {
// record migration status
additionalInfo := ctx.Value(AdditionalInfoKey)
migrated := "false"
if additionalInfo != nil {
additionalInfoVal, ok := additionalInfo.(AdditionalInfo)
if !ok {
klog.Errorf("Failed to record migrated status, cannot convert additional info %v", additionalInfo)
return err
}
migrated = additionalInfoVal.Migrated
}
cmmv, metricsErr := cmm.WithLabelValues(map[string]string{metrics.LabelMigrated: migrated})
if metricsErr != nil {
klog.Errorf("Failed to record migrated status, error: %v", metricsErr)
} else {
cmmBase = cmmv
}
}
// Record the default metric
cmmBase.RecordMetrics(
method, /* operationName */
err, /* operationErr */
duration, /* operationDuration */
)

return err
}

Expand Down
153 changes: 99 additions & 54 deletions connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,64 +340,109 @@ func TestExplicitReconnect(t *testing.T) {
}

func TestConnectMetrics(t *testing.T) {
tmp := tmpDir(t)
defer os.RemoveAll(tmp)
cmmServer := metrics.NewCSIMetricsManagerForPlugin("fake.csi.driver.io")
// We have to have a real implementation of the gRPC call, otherwise the metrics
// interceptor is not called. The CSI identity service is used because it's simple.
addr, stopServer := startServer(t, tmp, &identityServer{}, nil, cmmServer)
defer stopServer()

cmm := metrics.NewCSIMetricsManager("fake.csi.driver.io")
conn, err := Connect(addr, cmm)
if assert.NoError(t, err, "connect via absolute path") &&
assert.NotNil(t, conn, "got a connection") {
defer conn.Close()
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")

identityClient := csi.NewIdentityClient(conn)
if _, err := identityClient.GetPluginInfo(context.Background(), &csi.GetPluginInfoRequest{}); assert.Error(t, err) {
errStatus, _ := status.FromError(err)
assert.Equal(t, codes.Unimplemented, errStatus.Code(), "not implemented")
}
testCases := []struct {
name string
expectedMetrics string
ctx context.Context
checkServer bool
cmm metrics.CSIMetricsManager
}{
{
name: "Regular connection test",
expectedMetrics: `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
# TYPE csi_sidecar_operations_seconds histogram
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="2.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="10"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="15"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="50"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="120"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="300"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="600"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="+Inf"} 1
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 0
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 1
`,
ctx: context.Background(),
cmm: metrics.NewCSIMetricsManager("fake.csi.driver.io"),
checkServer: true,
},
{
name: "connection test with migrated metrics",
expectedMetrics: `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
# TYPE csi_sidecar_operations_seconds histogram
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="0.1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="0.25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="0.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="2.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="10"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="15"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="50"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="120"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="300"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="600"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true",le="+Inf"} 1
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true"} 0
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",migrated="true"} 1
`,
ctx: context.WithValue(context.Background(), AdditionalInfoKey, AdditionalInfo{
Migrated: "true",
}),
cmm: metrics.NewCSIMetricsManagerWithOptions("fake.csi.driver.io", metrics.WithMigration()),
checkServer: false,
},
}
for _, test := range testCases {
t.Logf("Running testcase %v", test.name)
tmp := tmpDir(t)
defer os.RemoveAll(tmp)
cmmServer := metrics.NewCSIMetricsManagerForPlugin("fake.csi.driver.io")
// We have to have a real implementation of the gRPC call, otherwise the metrics
// interceptor is not called. The CSI identity service is used because it's simple.
addr, stopServer := startServer(t, tmp, &identityServer{}, nil, cmmServer)
defer stopServer()

cmm := test.cmm
conn, err := Connect(addr, cmm)
if assert.NoError(t, err, "connect via absolute path") &&
assert.NotNil(t, conn, "got a connection") {
defer conn.Close()
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")

identityClient := csi.NewIdentityClient(conn)
if _, err := identityClient.GetPluginInfo(test.ctx, &csi.GetPluginInfoRequest{}); assert.Error(t, err) {
errStatus, _ := status.FromError(err)
assert.Equal(t, codes.Unimplemented, errStatus.Code(), "not implemented")
}
}

expectedMetrics := `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
# TYPE csi_sidecar_operations_seconds histogram
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="2.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="10"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="15"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="50"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="120"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="300"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="600"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="+Inf"} 1
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 0
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 1
`

if err := testutil.GatherAndCompare(
cmm.GetRegistry(), strings.NewReader(expectedMetrics), "csi_sidecar_operations_seconds"); err != nil {
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
err = verifyMetricsError(t, err, "csi_sidecar_operations_seconds_sum")
if err != nil {
t.Errorf("Expected client metrics not found -- %v", err)
if err := testutil.GatherAndCompare(
cmm.GetRegistry(), strings.NewReader(test.expectedMetrics), "csi_sidecar_operations_seconds"); err != nil {
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
err = verifyMetricsError(t, err, "csi_sidecar_operations_seconds_sum")
if err != nil {
t.Errorf("Expected client metrics not found -- %v", err)
}
}
}

expectedMetrics = strings.Replace(expectedMetrics, "csi_sidecar", metrics.SubsystemPlugin, -1)
if err := testutil.GatherAndCompare(
cmmServer.GetRegistry(), strings.NewReader(expectedMetrics), "csi_plugin_operations_seconds"); err != nil {
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
err = verifyMetricsError(t, err, metrics.SubsystemPlugin+"_operations_seconds_sum")
if err != nil {
t.Errorf("Expected server metrics not found -- %v", err)
if test.checkServer {
expectedMetrics := strings.Replace(test.expectedMetrics, "csi_sidecar", metrics.SubsystemPlugin, -1)
if err := testutil.GatherAndCompare(
cmmServer.GetRegistry(), strings.NewReader(expectedMetrics), "csi_plugin_operations_seconds"); err != nil {
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
err = verifyMetricsError(t, err, metrics.SubsystemPlugin+"_operations_seconds_sum")
if err != nil {
t.Errorf("Expected server metrics not found -- %v", err)
}
}
}
}
}
Expand Down
Loading

0 comments on commit e9aafaf

Please sign in to comment.