Skip to content

Commit

Permalink
Add DeleteNamespace API (#2645)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Mar 26, 2022
1 parent 752ff50 commit cc51711
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 40 deletions.
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ const (
// of Timeout and if no activity is seen even after that the connection is closed.
KeepAliveTimeout = "frontend.keepAliveTimeout"

// DeleteNamespaceDeleteActivityRPS is RPS per every parallel delete executions activity.
// Total RPS is equal to DeleteNamespaceDeleteActivityRPS * DeleteNamespaceConcurrentDeleteExecutionsActivities.
DeleteNamespaceDeleteActivityRPS = "frontend.deleteNamespaceDeleteActivityRPS"
// DeleteNamespaceConcurrentDeleteExecutionsActivities is a number of concurrent delete executions activities.
// Must be not greater than 256 and number of worker cores in the cluster.
DeleteNamespaceConcurrentDeleteExecutionsActivities = "frontend.deleteNamespaceConcurrentDeleteExecutionsActivities"

// key for matching

// MatchingRPS is request rate per second for each matching host
Expand Down
4 changes: 4 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,3 +892,7 @@ func DeletedExecutionsCount(count int) ZapTag {
func DeletedExecutionsErrorCount(count int) ZapTag {
return NewInt("delete-executions-error-count", count)
}

func Endpoint(endpoint string) ZapTag {
return NewStringTag("endpoint", endpoint)
}
10 changes: 9 additions & 1 deletion common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,9 @@ const (
OperatorAddSearchAttributesScope = iota + NumAdminScopes
// OperatorRemoveSearchAttributesScope is the metric scope for operator.RemoveSearchAttributes
OperatorRemoveSearchAttributesScope
// OperatorListSearchAttributesScope is the metric scope for operator.GetSearchAttributes
// OperatorListSearchAttributesScope is the metric scope for operator.ListSearchAttributes
OperatorListSearchAttributesScope
OperatorDeleteNamespaceScope

NumOperatorScopes
)
Expand Down Expand Up @@ -1534,6 +1535,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
OperatorAddSearchAttributesScope: {operation: "OperatorAddSearchAttributes"},
OperatorRemoveSearchAttributesScope: {operation: "OperatorRemoveSearchAttributes"},
OperatorListSearchAttributesScope: {operation: "OperatorListSearchAttributes"},
OperatorDeleteNamespaceScope: {operation: "OperatorDeleteNamespace"},
FrontendStartWorkflowExecutionScope: {operation: "StartWorkflowExecution"},
FrontendPollWorkflowTaskQueueScope: {operation: "PollWorkflowTaskQueue"},
FrontendPollActivityTaskQueueScope: {operation: "PollActivityTaskQueue"},
Expand Down Expand Up @@ -1903,6 +1905,9 @@ const (
ElasticsearchDocumentParseFailuresCount
ElasticsearchDocumentGenerateFailuresCount

DeleteNamespaceWorkflowSuccessCount
DeleteNamespaceWorkflowFailuresCount

NoopImplementationIsUsed

NumCommonMetrics // Needs to be last on this list for iota numbering
Expand Down Expand Up @@ -2350,6 +2355,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
AddSearchAttributesWorkflowSuccessCount: NewCounterDef("add_search_attributes_workflow_success"),
AddSearchAttributesWorkflowFailuresCount: NewCounterDef("add_search_attributes_workflow_failure"),

DeleteNamespaceWorkflowSuccessCount: NewCounterDef("delete_namespace_workflow_success"),
DeleteNamespaceWorkflowFailuresCount: NewCounterDef("delete_namespace_workflow_failure"),

MatchingClientForwardedCounter: NewCounterDef("forwarded"),
MatchingClientInvalidTaskQueueName: NewCounterDef("invalid_task_queue_name"),

Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
go.opentelemetry.io/otel/sdk v1.4.0
go.opentelemetry.io/otel/sdk/export/metric v0.27.0
go.opentelemetry.io/otel/sdk/metric v0.27.0
go.temporal.io/api v1.7.1-0.20220325233305-d1f0ee499b92
go.temporal.io/api v1.7.1-0.20220325235300-690c145d4f09
go.temporal.io/sdk v1.14.0
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
Expand All @@ -56,8 +56,6 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

replace go.temporal.io/api v1.7.1-0.20220324004000-817724af565a => ../temporal-api-go

require (
cloud.google.com/go v0.100.2 // indirect
cloud.google.com/go/compute v1.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ go.opentelemetry.io/otel/trace v1.4.0 h1:4OOUrPZdVFQkbzl/JSdvGCWIdw5ONXXxzHlaLlW
go.opentelemetry.io/otel/trace v1.4.0/go.mod h1:uc3eRsqDfWs9R7b92xbQbU42/eTNz4N+gLP8qJCi4aE=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a/go.mod h1:OnUq5eS+Nyx+irKb3Ws5YB7yjGFf5XmI3WcVRU9COEo=
go.temporal.io/api v1.7.1-0.20220325233305-d1f0ee499b92 h1:dqtD8ZFr6qswwwQLilclkQca6gQiNhWOTgEl3v5M718=
go.temporal.io/api v1.7.1-0.20220325233305-d1f0ee499b92/go.mod h1:IfIURICQ7KeBFaPKzv9opaKC6FvcHR4FS7EQY+hbSiA=
go.temporal.io/api v1.7.1-0.20220325235300-690c145d4f09 h1:eCuoCyjouxhPvVulGS/w0kcljKXCpPxLVlkZH2XsYyk=
go.temporal.io/api v1.7.1-0.20220325235300-690c145d4f09/go.mod h1:IfIURICQ7KeBFaPKzv9opaKC6FvcHR4FS7EQY+hbSiA=
go.temporal.io/sdk v1.14.0 h1:7tJO72gK4xmsZ8W3Xp1rwKYdkwQ/mgnKN5LmROyZTac=
go.temporal.io/sdk v1.14.0/go.mod h1:7rvvSS6oCXp19JSFQtSOhLxCX3wpEQSJZJlyCGleo9M=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
2 changes: 1 addition & 1 deletion proto/api
2 changes: 2 additions & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func AdminHandlerProvider(
}

func OperatorHandlerProvider(
config *Config,
esConfig *esclient.Config,
esClient esclient.Client,
logger resource.SnTaggedLogger,
Expand All @@ -431,6 +432,7 @@ func OperatorHandlerProvider(
healthServer *health.Server,
) *OperatorHandlerImpl {
args := NewOperatorHandlerImplArgs{
config,
esConfig,
esClient,
logger,
Expand Down
15 changes: 15 additions & 0 deletions service/frontend/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 65 additions & 8 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ import (

"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/service/worker"
"go.temporal.io/server/service/worker/addsearchattributes"
"go.temporal.io/server/service/worker/deletenamespace"
"go.temporal.io/server/service/worker/deletenamespace/deleteexecutions"

"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
Expand All @@ -58,6 +61,7 @@ type (

healthStatus int32
logger log.Logger
config *Config
esConfig *esclient.Config
esClient esclient.Client
sdkClientFactory sdk.ClientFactory
Expand All @@ -68,6 +72,7 @@ type (
}

NewOperatorHandlerImplArgs struct {
config *Config
EsConfig *esclient.Config
EsClient esclient.Client
Logger log.Logger
Expand All @@ -87,6 +92,7 @@ func NewOperatorHandlerImpl(
handler := &OperatorHandlerImpl{
logger: args.Logger,
status: common.DaemonStatusInitialized,
config: args.config,
esConfig: args.EsConfig,
esClient: args.EsClient,
sdkClientFactory: args.sdkClientFactory,
Expand Down Expand Up @@ -276,6 +282,61 @@ func (h *OperatorHandlerImpl) ListSearchAttributes(ctx context.Context, request
}, nil
}

func (h *OperatorHandlerImpl) DeleteNamespace(ctx context.Context, request *operatorservice.DeleteNamespaceRequest) (_ *operatorservice.DeleteNamespaceResponse, retError error) {
const endpointName = "DeleteNamespace"

defer log.CapturePanic(h.logger, &retError)

scope, sw := h.startRequestProfile(metrics.OperatorDeleteNamespaceScope)
defer sw.Stop()

// validate request
if request == nil {
return nil, h.error(errRequestNotSet, scope, endpointName)
}

if request.GetNamespace() == "" {
return nil, h.error(errNamespaceNotSet, scope, endpointName)
}

// Execute workflow.
wfParams := deletenamespace.DeleteNamespaceWorkflowParams{
Namespace: namespace.Name(request.GetNamespace()),
DeleteExecutionsConfig: deleteexecutions.DeleteExecutionsConfig{
DeleteActivityRPS: h.config.DeleteNamespaceDeleteActivityRPS(),
ConcurrentDeleteExecutionsActivities: h.config.DeleteNamespaceConcurrentDeleteExecutionsActivities(),
},
}

sdkClient := h.sdkClientFactory.GetSystemClient(h.logger)
run, err := sdkClient.ExecuteWorkflow(
ctx,
sdkclient.StartWorkflowOptions{
TaskQueue: worker.DefaultWorkerTaskQueue,
ID: deleteexecutions.WorkflowName,
},
deletenamespace.WorkflowName,
wfParams,
)
if err != nil {
return nil, h.error(serviceerror.NewUnavailable(fmt.Sprintf(errUnableToStartWorkflowMessage, deletenamespace.WorkflowName, err)), scope, endpointName)
}

// Wait for workflow to complete.
var wfResult deletenamespace.DeleteNamespaceWorkflowResult
err = run.Get(ctx, &wfResult)
if err != nil {
scope.IncCounter(metrics.DeleteNamespaceFailuresCount)
execution := &commonpb.WorkflowExecution{WorkflowId: deletenamespace.WorkflowName, RunId: run.GetRunID()}
return nil, h.error(serviceerror.NewSystemWorkflow(execution, err), scope, endpointName)
}
scope.IncCounter(metrics.DeleteNamespaceSuccessCount)

return &operatorservice.DeleteNamespaceResponse{
DeletedNamespace: wfResult.DeletedNamespace.String(),
}, nil
}

// startRequestProfile initiates recording of request metrics
func (h *OperatorHandlerImpl) startRequestProfile(scope int) (metrics.Scope, metrics.Stopwatch) {
metricsScope := h.metricsClient.Scope(scope)
Expand All @@ -287,21 +348,17 @@ func (h *OperatorHandlerImpl) startRequestProfile(scope int) (metrics.Scope, met
func (h *OperatorHandlerImpl) error(err error, scope metrics.Scope, endpointName string) error {
switch err := err.(type) {
case *serviceerror.Unavailable:
h.logger.Error("["+endpointName+"] Unavailable error", tag.Error(err))
h.logger.Error("Unavailable error.", tag.Error(err), tag.Endpoint(endpointName))
scope.IncCounter(metrics.ServiceFailures)
return err
case *serviceerror.InvalidArgument:
scope.IncCounter(metrics.ServiceErrInvalidArgumentCounter)
return err
case *serviceerror.ResourceExhausted:
scope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter)
return err
case *serviceerror.NotFound:
return err
default:
h.logger.Error("Unknown error.", tag.Error(err), tag.Endpoint(endpointName))
scope.IncCounter(metrics.ServiceFailures)
}

h.logger.Error("["+endpointName+"] Unknown error", tag.Error(err))
scope.IncCounter(metrics.ServiceFailures)

return err
}
1 change: 1 addition & 0 deletions service/frontend/operator_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (s *operatorHandlerSuite) SetupTest() {
s.mockResource = resource.NewTest(s.controller, metrics.Frontend)

args := NewOperatorHandlerImplArgs{
nil,
nil,
s.mockResource.ESClient,
s.mockResource.Logger,
Expand Down
10 changes: 10 additions & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ type Config struct {
KeepAliveTime dynamicconfig.DurationPropertyFn
// Wait for the ping ack before assuming the connection is dead.
KeepAliveTimeout dynamicconfig.DurationPropertyFn

// RPS per every parallel delete executions activity.
// Total RPS is equal to DeleteNamespaceDeleteActivityRPS * DeleteNamespaceConcurrentDeleteExecutionsActivities.
DeleteNamespaceDeleteActivityRPS dynamicconfig.IntPropertyFn
// Number of concurrent delete executions activities.
// Must be not greater than 256 and number of worker cores in the cluster.
DeleteNamespaceConcurrentDeleteExecutionsActivities dynamicconfig.IntPropertyFn
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -184,6 +191,9 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName
KeepAliveMaxConnectionAgeGrace: dc.GetDurationProperty(dynamicconfig.KeepAliveMaxConnectionAgeGrace, 70*time.Second),
KeepAliveTime: dc.GetDurationProperty(dynamicconfig.KeepAliveTime, 1*time.Minute),
KeepAliveTimeout: dc.GetDurationProperty(dynamicconfig.KeepAliveTimeout, 10*time.Second),

DeleteNamespaceDeleteActivityRPS: dc.GetIntProperty(dynamicconfig.DeleteNamespaceDeleteActivityRPS, 100),
DeleteNamespaceConcurrentDeleteExecutionsActivities: dc.GetIntProperty(dynamicconfig.DeleteNamespaceConcurrentDeleteExecutionsActivities, 4),
}
}

Expand Down
4 changes: 2 additions & 2 deletions service/worker/addsearchattributes/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ var (
// AddSearchAttributesWorkflow is the workflow that adds search attributes to the cluster for specific index.
func AddSearchAttributesWorkflow(ctx workflow.Context, params WorkflowParams) error {
logger := workflow.GetLogger(ctx)
logger.Info("Workflow started.", "wf-type", WorkflowName)
logger.Info("Workflow started.", tag.WorkflowType(WorkflowName))

var a *activities
var err error
Expand All @@ -124,7 +124,7 @@ func AddSearchAttributesWorkflow(ctx workflow.Context, params WorkflowParams) er
return fmt.Errorf("%w: UpdateClusterMetadataActivity: %v", ErrUnableToExecuteActivity, err)
}

logger.Info("Workflow finished successfully.", "wf-type", WorkflowName)
logger.Info("Workflow finished successfully.", tag.WorkflowType(WorkflowName))
return nil
}

Expand Down
32 changes: 19 additions & 13 deletions service/worker/deletenamespace/deleteexecutions/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,40 @@ import (
)

const (
deleteActivityRPS = 100
pageSize = 1000
pagesPerExecutionCount = 256
concurrentDeleteExecutionsActivities = 4
maxConcurrentDeleteExecutionsActivities = 256
defaultDeleteActivityRPS = 100
defaultPageSize = 1000
defaultPagesPerExecutionCount = 256
defaultConcurrentDeleteExecutionsActivities = 4
maxConcurrentDeleteExecutionsActivities = 256
)

type (
DeleteExecutionsConfig struct {
DeleteActivityRPS int
PageSize int
PagesPerExecutionCount int // Before doing ContinueAsNew.
ConcurrentDeleteExecutionsActivities int // Must be not greater than PagesPerExecutionCount and number of workers in the cluster.
// RPS per every parallel delete executions activity.
// Total RPS is equal to DeleteActivityRPS * ConcurrentDeleteExecutionsActivities.
DeleteActivityRPS int
// Page size to read executions from visibility.
PageSize int
// Number of pages before returning ContinueAsNew.
PagesPerExecutionCount int
// Number of concurrent delete executions activities.
// Must be not greater than PagesPerExecutionCount and number of worker cores in the cluster.
ConcurrentDeleteExecutionsActivities int
}
)

func (cfg *DeleteExecutionsConfig) ApplyDefaults() {
if cfg.DeleteActivityRPS <= 0 {
cfg.DeleteActivityRPS = deleteActivityRPS
cfg.DeleteActivityRPS = defaultDeleteActivityRPS
}
if cfg.PageSize <= 0 {
cfg.PageSize = pageSize
cfg.PageSize = defaultPageSize
}
if cfg.PagesPerExecutionCount <= 0 {
cfg.PagesPerExecutionCount = pagesPerExecutionCount
cfg.PagesPerExecutionCount = defaultPagesPerExecutionCount
}
if cfg.ConcurrentDeleteExecutionsActivities <= 0 {
cfg.ConcurrentDeleteExecutionsActivities = concurrentDeleteExecutionsActivities
cfg.ConcurrentDeleteExecutionsActivities = defaultConcurrentDeleteExecutionsActivities
}
if cfg.ConcurrentDeleteExecutionsActivities > maxConcurrentDeleteExecutionsActivities {
cfg.ConcurrentDeleteExecutionsActivities = maxConcurrentDeleteExecutionsActivities
Expand Down
Loading

0 comments on commit cc51711

Please sign in to comment.