Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Implement GetFlyteKitMetrics endpoint #575

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion pkg/manager/impl/metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"

"google.golang.org/protobuf/types/known/timestamppb"

dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces"
"github.com/flyteorg/flytestdlib/storage"
)

const (
Expand Down Expand Up @@ -59,6 +62,8 @@ type MetricsManager struct {
nodeExecutionManager interfaces.NodeExecutionInterface
taskExecutionManager interfaces.TaskExecutionInterface
metrics metrics
urlData dataInterfaces.RemoteURLInterface
storageClient *storage.DataStore
}

// createOperationSpan returns a Span defined by the provided arguments.
Expand Down Expand Up @@ -663,13 +668,39 @@ func (m *MetricsManager) GetExecutionMetrics(ctx context.Context,
return &admin.WorkflowExecutionGetMetricsResponse{Span: span}, nil
}

func (m *MetricsManager) GetTaskMetrics(ctx context.Context,
request admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsResponse, error) {

nodeExecution, err := m.nodeExecutionManager.GetNodeExecution(ctx, admin.NodeExecutionGetRequest{
Id: request.Id,
},
)
if err != nil {
return nil, err
}

blob, err := m.urlData.Get(ctx, nodeExecution.Closure.SpanUri)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: m.urlData.Get generates the signed URL. We may not need it.

if err != nil {
return nil, err
}

var flyteKitSpan core.Span
err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(blob.Url), &flyteKitSpan)
if err != nil {
return nil, err
}

return &admin.GetTaskMetricsResponse{Span: &flyteKitSpan}, nil
}

// NewMetricsManager returns a new MetricsManager constructed with the provided arguments.
func NewMetricsManager(
workflowManager interfaces.WorkflowInterface,
executionManager interfaces.ExecutionInterface,
nodeExecutionManager interfaces.NodeExecutionInterface,
taskExecutionManager interfaces.TaskExecutionInterface,
scope promutils.Scope) interfaces.MetricsInterface {
scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface, storageClient *storage.DataStore,
) interfaces.MetricsInterface {
metrics := metrics{
Scope: scope,
}
Expand All @@ -680,5 +711,7 @@ func NewMetricsManager(
nodeExecutionManager: nodeExecutionManager,
taskExecutionManager: taskExecutionManager,
metrics: metrics,
urlData: urlData,
storageClient: storageClient,
}
}
3 changes: 3 additions & 0 deletions pkg/manager/interfaces/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ import (
type MetricsInterface interface {
GetExecutionMetrics(ctx context.Context, request admin.WorkflowExecutionGetMetricsRequest) (
*admin.WorkflowExecutionGetMetricsResponse, error)

GetTaskMetrics(ctx context.Context, request admin.GetTaskMetricsRequest) (
*admin.GetTaskMetricsResponse, error)
}
41 changes: 41 additions & 0 deletions pkg/manager/mocks/metrics_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func addTerminalState(
nodeExecutionModel.ErrorCode = &request.Event.GetError().Code
}
closure.DeckUri = request.Event.DeckUri
closure.SpanUri = request.Event.SpanUri

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi
ProjectManager: manager.NewProjectManager(repo, configuration),
ResourceManager: resources.NewResourceManager(repo, configuration.ApplicationConfiguration()),
MetricsManager: manager.NewMetricsManager(workflowManager, executionManager, nodeExecutionManager,
taskExecutionManager, adminScope.NewSubScope("metrics_manager")),
taskExecutionManager, adminScope.NewSubScope("metrics_manager"), urlData, dataStorageClient),
Metrics: InitMetrics(adminScope),
}
}
18 changes: 18 additions & 0 deletions pkg/rpc/adminservice/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,24 @@ func (m *AdminService) GetExecutionMetrics(
return response, nil
}

func (m *AdminService) GetTaskMetrics(
ctx context.Context, request *admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
var response *admin.GetTaskMetricsResponse
var err error
m.Metrics.executionEndpointMetrics.GetTaskMetrics.Time(func() {
response, err = m.MetricsManager.GetTaskMetrics(ctx, *request)
})
if err != nil {
return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.GetTaskMetrics)
}
m.Metrics.executionEndpointMetrics.GetTaskMetrics.Success()
return response, nil
}

func (m *AdminService) ListExecutions(
ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error) {
defer m.interceptPanic(ctx, request)
Expand Down
44 changes: 23 additions & 21 deletions pkg/rpc/adminservice/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
type executionEndpointMetrics struct {
scope promutils.Scope

create util.RequestMetrics
relaunch util.RequestMetrics
recover util.RequestMetrics
createEvent util.RequestMetrics
get util.RequestMetrics
update util.RequestMetrics
getData util.RequestMetrics
getMetrics util.RequestMetrics
list util.RequestMetrics
terminate util.RequestMetrics
create util.RequestMetrics
relaunch util.RequestMetrics
recover util.RequestMetrics
createEvent util.RequestMetrics
get util.RequestMetrics
update util.RequestMetrics
getData util.RequestMetrics
getMetrics util.RequestMetrics
GetTaskMetrics util.RequestMetrics
list util.RequestMetrics
terminate util.RequestMetrics
}

type launchPlanEndpointMetrics struct {
Expand Down Expand Up @@ -131,17 +132,18 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics {
"panics encountered while handling requests to the admin service"),

executionEndpointMetrics: executionEndpointMetrics{
scope: adminScope,
create: util.NewRequestMetrics(adminScope, "create_execution"),
relaunch: util.NewRequestMetrics(adminScope, "relaunch_execution"),
recover: util.NewRequestMetrics(adminScope, "recover_execution"),
createEvent: util.NewRequestMetrics(adminScope, "create_execution_event"),
get: util.NewRequestMetrics(adminScope, "get_execution"),
update: util.NewRequestMetrics(adminScope, "update_execution"),
getData: util.NewRequestMetrics(adminScope, "get_execution_data"),
getMetrics: util.NewRequestMetrics(adminScope, "get_execution_metrics"),
list: util.NewRequestMetrics(adminScope, "list_execution"),
terminate: util.NewRequestMetrics(adminScope, "terminate_execution"),
scope: adminScope,
create: util.NewRequestMetrics(adminScope, "create_execution"),
relaunch: util.NewRequestMetrics(adminScope, "relaunch_execution"),
recover: util.NewRequestMetrics(adminScope, "recover_execution"),
createEvent: util.NewRequestMetrics(adminScope, "create_execution_event"),
get: util.NewRequestMetrics(adminScope, "get_execution"),
update: util.NewRequestMetrics(adminScope, "update_execution"),
getData: util.NewRequestMetrics(adminScope, "get_execution_data"),
getMetrics: util.NewRequestMetrics(adminScope, "get_execution_metrics"),
GetTaskMetrics: util.NewRequestMetrics(adminScope, "get_flytekit_metrics"),
list: util.NewRequestMetrics(adminScope, "list_execution"),
terminate: util.NewRequestMetrics(adminScope, "terminate_execution"),
},
launchPlanEndpointMetrics: launchPlanEndpointMetrics{
scope: adminScope,
Expand Down