diff --git a/api/experimental/arrow/v1/arrow_service.pb.go b/api/experimental/arrow/v1/arrow_service.pb.go index 1ca843ff..7ed8263e 100644 --- a/api/experimental/arrow/v1/arrow_service.pb.go +++ b/api/experimental/arrow/v1/arrow_service.pb.go @@ -23,7 +23,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.12 +// protoc v3.19.4 // source: opentelemetry/proto/experimental/arrow/v1/arrow_service.proto package v1 @@ -176,6 +176,7 @@ const ( StatusCode_OK StatusCode = 0 StatusCode_UNAVAILABLE StatusCode = 1 StatusCode_INVALID_ARGUMENT StatusCode = 2 + StatusCode_STREAM_SHUTDOWN StatusCode = 3 ) // Enum value maps for StatusCode. @@ -184,11 +185,13 @@ var ( 0: "OK", 1: "UNAVAILABLE", 2: "INVALID_ARGUMENT", + 3: "STREAM_SHUTDOWN", } StatusCode_value = map[string]int32{ "OK": 0, "UNAVAILABLE": 1, "INVALID_ARGUMENT": 2, + "STREAM_SHUTDOWN": 3, } ) @@ -506,43 +509,34 @@ var file_opentelemetry_proto_experimental_arrow_v1_arrow_service_proto_rawDesc = 0x4c, 0x49, 0x4e, 0x4b, 0x53, 0x10, 0x2b, 0x12, 0x14, 0x0a, 0x10, 0x53, 0x50, 0x41, 0x4e, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x41, 0x54, 0x54, 0x52, 0x53, 0x10, 0x2c, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x50, 0x41, 0x4e, 0x5f, 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x41, 0x54, 0x54, 0x52, 0x53, - 0x10, 0x2d, 0x2a, 0x3b, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, + 0x10, 0x2d, 0x2a, 0x50, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x41, 0x56, 0x41, 0x49, 0x4c, 0x41, 0x42, 0x4c, 0x45, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x49, 0x4e, 0x56, - 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x41, 0x52, 0x47, 0x55, 0x4d, 0x45, 0x4e, 0x54, 0x10, 0x02, 0x32, - 0xa0, 0x01, 0x0a, 0x12, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x0b, 0x41, 0x72, 0x72, 0x6f, 0x77, - 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x3c, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, - 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, - 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, - 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x52, 0x65, 0x63, - 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x36, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, - 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, - 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, - 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x28, 0x01, - 0x30, 0x01, 0x32, 0xa0, 0x01, 0x0a, 0x12, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x54, 0x72, 0x61, 0x63, - 0x65, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x0b, 0x41, 0x72, - 0x72, 0x6f, 0x77, 0x54, 0x72, 0x61, 0x63, 0x65, 0x73, 0x12, 0x3c, 0x2e, 0x6f, 0x70, 0x65, 0x6e, + 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x41, 0x52, 0x47, 0x55, 0x4d, 0x45, 0x4e, 0x54, 0x10, 0x02, 0x12, + 0x13, 0x0a, 0x0f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x53, 0x48, 0x55, 0x54, 0x44, 0x4f, + 0x57, 0x4e, 0x10, 0x03, 0x32, 0xa0, 0x01, 0x0a, 0x12, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x0b, + 0x41, 0x72, 0x72, 0x6f, 0x77, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x3c, 0x2e, 0x6f, 0x70, + 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, + 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x41, 0x72, 0x72, + 0x6f, 0x77, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x36, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, - 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x41, 0x72, 0x72, 0x6f, 0x77, - 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x36, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, - 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, - 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, - 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, - 0x00, 0x28, 0x01, 0x30, 0x01, 0x32, 0x9c, 0x01, 0x0a, 0x10, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x4c, - 0x6f, 0x67, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x87, 0x01, 0x0a, 0x09, 0x41, - 0x72, 0x72, 0x6f, 0x77, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x3c, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, - 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, - 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, - 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x52, - 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x36, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, - 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, - 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, - 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, - 0x28, 0x01, 0x30, 0x01, 0x32, 0xa2, 0x01, 0x0a, 0x13, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x8a, 0x01, 0x0a, - 0x0c, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x3c, 0x2e, + 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x32, 0xa0, 0x01, 0x0a, 0x12, 0x41, 0x72, 0x72, 0x6f, + 0x77, 0x54, 0x72, 0x61, 0x63, 0x65, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, + 0x01, 0x0a, 0x0b, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x54, 0x72, 0x61, 0x63, 0x65, 0x73, 0x12, 0x3c, + 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, + 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, + 0x41, 0x72, 0x72, 0x6f, 0x77, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x36, 0x2e, 0x6f, + 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, + 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x32, 0x9c, 0x01, 0x0a, 0x10, 0x41, + 0x72, 0x72, 0x6f, 0x77, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x87, 0x01, 0x0a, 0x09, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x3c, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x41, @@ -550,16 +544,26 @@ var file_opentelemetry_proto_experimental_arrow_v1_arrow_service_proto_rawDesc = 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x83, 0x01, 0x0a, 0x2c, 0x69, 0x6f, - 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, - 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x42, 0x11, 0x41, 0x72, 0x72, 0x6f, - 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, - 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, - 0x2d, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2f, 0x6f, 0x74, 0x65, 0x6c, 0x2d, - 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, - 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2f, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2f, 0x76, 0x31, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x75, 0x73, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x32, 0xa2, 0x01, 0x0a, 0x13, 0x41, 0x72, + 0x72, 0x6f, 0x77, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x12, 0x8a, 0x01, 0x0a, 0x0c, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x4d, 0x65, 0x74, 0x72, 0x69, + 0x63, 0x73, 0x12, 0x3c, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, + 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, + 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, + 0x61, 0x74, 0x63, 0x68, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, + 0x1a, 0x36, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, + 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, + 0x63, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x83, + 0x01, 0x0a, 0x2c, 0x69, 0x6f, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, + 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, + 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x76, 0x31, 0x42, + 0x11, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x50, 0x72, 0x6f, + 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x2d, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2f, + 0x6f, 0x74, 0x65, 0x6c, 0x2d, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x65, + 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x2f, 0x61, 0x72, 0x72, 0x6f, + 0x77, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/api/experimental/arrow/v1/arrow_service_grpc.pb.go b/api/experimental/arrow/v1/arrow_service_grpc.pb.go index 3cd5a2a1..5e96f80b 100644 --- a/api/experimental/arrow/v1/arrow_service_grpc.pb.go +++ b/api/experimental/arrow/v1/arrow_service_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.12 +// - protoc v3.19.4 // source: opentelemetry/proto/experimental/arrow/v1/arrow_service.proto package v1 diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 9c29d836..63f79742 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -153,6 +153,13 @@ func statusOKFor(id int64) *arrowpb.BatchStatus { } } +func statusStreamShutdownFor(id int64) *arrowpb.BatchStatus { + return &arrowpb.BatchStatus{ + BatchId: id, + StatusCode: arrowpb.StatusCode_STREAM_SHUTDOWN, + } +} + func statusUnavailableFor(id int64) *arrowpb.BatchStatus { return &arrowpb.BatchStatus{ BatchId: id, diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream.go b/collector/exporter/otelarrowexporter/internal/arrow/stream.go index 9fd3b9d2..38e743d6 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream.go @@ -355,20 +355,20 @@ func (s *Stream) read(_ context.Context) error { // timeout. TODO: possibly, improve to wait for no outstanding requests and then stop reading. resp, err := s.client.Recv() if err != nil { - // Once the send direction of stream is closed the server should return - // an error that mentions an EOF. The expected error code is codes.Unknown. - status, ok := status.FromError(err) - if ok && status.Message() == "EOF" && status.Code() == codes.Unknown { - return nil - } // Note: do not wrap, contains a Status. return err } + // This indicates the server received EOF from client shutdown. + // This is not an error because this is an expected shutdown + // initiated by the client by setting max_stream_lifetime. + if resp.StatusCode == arrowpb.StatusCode_STREAM_SHUTDOWN { + return nil + } + if err = s.processBatchStatus(resp); err != nil { return fmt.Errorf("process: %w", err) } - } } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go b/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go index 5e33321e..3bf58267 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go @@ -140,11 +140,17 @@ func TestStreamGracefulShutdown(t *testing.T) { defer wg.Done() batch := <-channel.sent channel.recv <- statusOKFor(batch.BatchId) + + // mimick the server which will send a batchID + // of 0 after max_stream_lifetime elapses. + time.Sleep(maxStreamLifetime) + channel.recv <- statusStreamShutdownFor(0) }() err := tc.get().SendAndWait(tc.bgctx, twoTraces) require.NoError(t, err) - // let stream get closed and send again. + + // need to sleep so CloseSend will be called. time.Sleep(maxStreamLifetime) err = tc.get().SendAndWait(tc.bgctx, twoTraces) require.Error(t, err) diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go index 9fee6793..f2eef9de 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -298,6 +298,18 @@ func (r *Receiver) anyStream(serverStream anyStreamServer) (retErr error) { req, err := serverStream.Recv() if err != nil { + // client called CloseSend() + if err == io.EOF { + status := &arrowpb.BatchStatus{} + status.StatusCode = arrowpb.StatusCode_STREAM_SHUTDOWN + err = serverStream.Send(status) + if err != nil { + r.logStreamError(err) + return err + } + return nil + } + r.logStreamError(err) return err } diff --git a/proto/opentelemetry/proto/experimental/arrow/v1/arrow_service.proto b/proto/opentelemetry/proto/experimental/arrow/v1/arrow_service.proto index 4d76dd7f..c25bdc6a 100644 --- a/proto/opentelemetry/proto/experimental/arrow/v1/arrow_service.proto +++ b/proto/opentelemetry/proto/experimental/arrow/v1/arrow_service.proto @@ -146,4 +146,5 @@ enum StatusCode { OK = 0; UNAVAILABLE = 1; INVALID_ARGUMENT = 2; + STREAM_SHUTDOWN = 3; }