Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose workflow history size and count to client #5392

Merged
merged 18 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 134 additions & 6 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

72 changes: 68 additions & 4 deletions .gen/go/matching/matching.go

Large diffs are not rendered by default.

72 changes: 68 additions & 4 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

677 changes: 375 additions & 302 deletions .gen/proto/history/v1/service.pb.go

Large diffs are not rendered by default.

828 changes: 415 additions & 413 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

300 changes: 169 additions & 131 deletions .gen/proto/matching/v1/service.pb.go

Large diffs are not rendered by default.

509 changes: 256 additions & 253 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/types/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type GetMutableStateResponse struct {
WorkflowCloseState *int32 `json:"workflowCloseState,omitempty"`
VersionHistories *VersionHistories `json:"versionHistories,omitempty"`
IsStickyTaskListEnabled bool `json:"isStickyTaskListEnabled,omitempty"`
HistorySize int64 `json:"historySize,omitempty"`
}

// GetNextEventID is an internal getter (TBD...)
Expand Down Expand Up @@ -585,6 +586,7 @@ type RecordDecisionTaskStartedResponse struct {
ScheduledTimestamp *int64 `json:"scheduledTimestamp,omitempty"`
StartedTimestamp *int64 `json:"startedTimestamp,omitempty"`
Queries map[string]*WorkflowQuery `json:"queries,omitempty"`
HistorySize int64 `json:"historySize,omitempty"`
}

// GetPreviousStartedEventID is an internal getter (TBD...)
Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2191,6 +2191,7 @@ func FromPollForDecisionTaskResponse(t *types.PollForDecisionTaskResponse) *apiv
StartedTime: unixNanoToTime(t.StartedTimestamp),
Queries: FromWorkflowQueryMap(t.Queries),
NextEventId: t.NextEventID,
TotalHistoryBytes: t.TotalHistoryBytes,
}
}

Expand All @@ -2214,6 +2215,7 @@ func ToPollForDecisionTaskResponse(t *apiv1.PollForDecisionTaskResponse) *types.
StartedTimestamp: timeToUnixNano(t.StartedTime),
Queries: ToWorkflowQueryMap(t.Queries),
NextEventID: t.NextEventId,
TotalHistoryBytes: t.TotalHistoryBytes,
}
}

Expand Down
4 changes: 4 additions & 0 deletions common/types/mapper/proto/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func FromHistoryGetMutableStateResponse(t *types.GetMutableStateResponse) *histo
WorkflowCloseState: FromWorkflowExecutionCloseStatus(workflowCloseState),
VersionHistories: FromVersionHistories(t.VersionHistories),
IsStickyTaskListEnabled: t.IsStickyTaskListEnabled,
HistorySize: t.HistorySize,
}
}

Expand All @@ -363,6 +364,7 @@ func ToHistoryGetMutableStateResponse(t *historyv1.GetMutableStateResponse) *typ
VersionHistories: ToVersionHistories(t.VersionHistories),
IsStickyTaskListEnabled: t.IsStickyTaskListEnabled,
IsWorkflowRunning: t.WorkflowState == sharedv1.WorkflowState_WORKFLOW_STATE_RUNNING,
HistorySize: t.HistorySize,
}
}

Expand Down Expand Up @@ -893,6 +895,7 @@ func FromHistoryRecordDecisionTaskStartedResponse(t *types.RecordDecisionTaskSta
ScheduledTime: unixNanoToTime(t.ScheduledTimestamp),
StartedTime: unixNanoToTime(t.StartedTimestamp),
Queries: FromWorkflowQueryMap(t.Queries),
HistorySize: t.HistorySize,
}
}

Expand All @@ -915,6 +918,7 @@ func ToHistoryRecordDecisionTaskStartedResponse(t *historyv1.RecordDecisionTaskS
ScheduledTimestamp: timeToUnixNano(t.ScheduledTime),
StartedTimestamp: timeToUnixNano(t.StartedTime),
Queries: ToWorkflowQueryMap(t.Queries),
HistorySize: t.HistorySize,
}
}

Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/proto/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ func FromMatchingPollForDecisionTaskResponse(t *types.MatchingPollForDecisionTas
ScheduledTime: unixNanoToTime(t.ScheduledTimestamp),
StartedTime: unixNanoToTime(t.StartedTimestamp),
Queries: FromWorkflowQueryMap(t.Queries),
TotalHistoryBytes: t.TotalHistoryBytes,
}
}

Expand All @@ -445,6 +446,7 @@ func ToMatchingPollForDecisionTaskResponse(t *matchingv1.PollForDecisionTaskResp
ScheduledTimestamp: timeToUnixNano(t.ScheduledTime),
StartedTimestamp: timeToUnixNano(t.StartedTime),
Queries: ToWorkflowQueryMap(t.Queries),
TotalHistoryBytes: t.TotalHistoryBytes,
}
}

Expand Down
4 changes: 4 additions & 0 deletions common/types/mapper/thrift/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func FromGetMutableStateResponse(t *types.GetMutableStateResponse) *history.GetM
WorkflowCloseState: t.WorkflowCloseState,
VersionHistories: FromVersionHistories(t.VersionHistories),
IsStickyTaskListEnabled: &t.IsStickyTaskListEnabled,
HistorySize: &t.HistorySize,
}
}

Expand Down Expand Up @@ -233,6 +234,7 @@ func ToGetMutableStateResponse(t *history.GetMutableStateResponse) *types.GetMut
WorkflowCloseState: t.WorkflowCloseState,
VersionHistories: ToVersionHistories(t.VersionHistories),
IsStickyTaskListEnabled: t.GetIsStickyTaskListEnabled(),
HistorySize: t.GetHistorySize(),
}
}

Expand Down Expand Up @@ -628,6 +630,7 @@ func FromRecordDecisionTaskStartedResponse(t *types.RecordDecisionTaskStartedRes
ScheduledTimestamp: t.ScheduledTimestamp,
StartedTimestamp: t.StartedTimestamp,
Queries: FromWorkflowQueryMap(t.Queries),
HistorySize: &t.HistorySize,
}
}

Expand All @@ -651,6 +654,7 @@ func ToRecordDecisionTaskStartedResponse(t *history.RecordDecisionTaskStartedRes
ScheduledTimestamp: t.ScheduledTimestamp,
StartedTimestamp: t.StartedTimestamp,
Queries: ToWorkflowQueryMap(t.Queries),
HistorySize: t.GetHistorySize(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func FromMatchingPollForDecisionTaskResponse(t *types.MatchingPollForDecisionTas
ScheduledTimestamp: t.ScheduledTimestamp,
StartedTimestamp: t.StartedTimestamp,
Queries: FromWorkflowQueryMap(t.Queries),
TotalHistoryBytes: &t.TotalHistoryBytes,
}
}

Expand All @@ -305,6 +306,7 @@ func ToMatchingPollForDecisionTaskResponse(t *matching.PollForDecisionTaskRespon
ScheduledTimestamp: t.ScheduledTimestamp,
StartedTimestamp: t.StartedTimestamp,
Queries: ToWorkflowQueryMap(t.Queries),
TotalHistoryBytes: t.GetTotalHistoryBytes(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3590,6 +3590,7 @@ func FromPollForDecisionTaskResponse(t *types.PollForDecisionTaskResponse) *shar
StartedTimestamp: t.StartedTimestamp,
Queries: FromWorkflowQueryMap(t.Queries),
NextEventId: &t.NextEventID,
TotalHistoryBytes: &t.TotalHistoryBytes,
}
}

Expand All @@ -3614,6 +3615,7 @@ func ToPollForDecisionTaskResponse(t *shared.PollForDecisionTaskResponse) *types
StartedTimestamp: t.StartedTimestamp,
Queries: ToWorkflowQueryMap(t.Queries),
NextEventID: t.GetNextEventId(),
TotalHistoryBytes: t.GetTotalHistoryBytes(),
}
}

Expand Down
9 changes: 9 additions & 0 deletions common/types/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ type MatchingPollForDecisionTaskResponse struct {
ScheduledTimestamp *int64 `json:"scheduledTimestamp,omitempty"`
StartedTimestamp *int64 `json:"startedTimestamp,omitempty"`
Queries map[string]*WorkflowQuery `json:"queries,omitempty"`
TotalHistoryBytes int64 `json:"currentHistorySize,omitempty"`
}

// GetWorkflowExecution is an internal getter (TBD...)
Expand Down Expand Up @@ -455,6 +456,14 @@ func (v *MatchingPollForDecisionTaskResponse) GetBranchToken() (o []byte) {
return
}

// GetTotalHistoryBytes is an internal getter of returning the history size in bytes
func (v *MatchingPollForDecisionTaskResponse) GetTotalHistoryBytes() (o int64) {
if v != nil {
return v.TotalHistoryBytes
}
return
}

// MatchingQueryWorkflowRequest is an internal type (TBD...)
type MatchingQueryWorkflowRequest struct {
DomainUUID string `json:"domainUUID,omitempty"`
Expand Down
10 changes: 9 additions & 1 deletion common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4029,6 +4029,7 @@ type PollForDecisionTaskResponse struct {
StartedTimestamp *int64 `json:"startedTimestamp,omitempty"`
Queries map[string]*WorkflowQuery `json:"queries,omitempty"`
NextEventID int64 `json:"nextEventId,omitempty"`
TotalHistoryBytes int64 `json:"historySize,omitempty"`
}

// GetTaskToken is an internal getter (TBD...)
Expand Down Expand Up @@ -4079,6 +4080,13 @@ func (v *PollForDecisionTaskResponse) GetNextEventID() (o int64) {
return
}

func (v *PollForDecisionTaskResponse) GetHistorySize() (o int64) {
if v != nil {
return v.TotalHistoryBytes
}
return
}

// PollerInfo is an internal type (TBD...)
type PollerInfo struct {
LastAccessTime *int64 `json:"lastAccessTime,omitempty"`
Expand Down Expand Up @@ -7163,7 +7171,7 @@ type WorkflowExecutionInfo struct {
StartTime *int64 `json:"startTime,omitempty"`
CloseTime *int64 `json:"closeTime,omitempty"`
CloseStatus *WorkflowExecutionCloseStatus `json:"closeStatus,omitempty"`
HistoryLength int64 `json:"historyLength,omitempty"`
HistoryLength int64 `json:"historyLength,omitempty"` //should be history count
ParentDomainID *string `json:"parentDomainId,omitempty"`
ParentDomain *string `json:"parentDomain,omitempty"`
ParentExecution *WorkflowExecution `json:"parentExecution,omitempty"`
Expand Down
25 changes: 13 additions & 12 deletions common/types/testdata/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,19 @@ const (
ClientLibraryVersion = "ClientLibraryVersion"
SupportedVersions = "SupportedVersions"

Attempt = 2
PageSize = 10
HistoryLength = 20
BacklogCountHint = 30
AckLevel = 1001
ReadLevel = 1002
RatePerSecond = 3.14
TaskID = 444
ShardID = 12345
MessageID1 = 50001
MessageID2 = 50002
EventStoreVersion = 333
Attempt = 2
PageSize = 10
HistoryLength = 20
BacklogCountHint = 30
AckLevel = 1001
ReadLevel = 1002
RatePerSecond = 3.14
TaskID = 444
ShardID = 12345
MessageID1 = 50001
MessageID2 = 50002
EventStoreVersion = 333
HistorySizeInBytes = 77779

EventID1 = int64(1)
EventID2 = int64(2)
Expand Down
2 changes: 2 additions & 0 deletions common/types/testdata/service_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var (
WorkflowCloseState: common.Int32Ptr(persistence.WorkflowCloseStatusTimedOut),
VersionHistories: &VersionHistories,
IsStickyTaskListEnabled: true,
HistorySize: HistorySizeInBytes,
}
HistoryGetReplicationMessagesRequest = AdminGetReplicationMessagesRequest
HistoryGetReplicationMessagesResponse = AdminGetReplicationMessagesResponse
Expand Down Expand Up @@ -171,6 +172,7 @@ var (
ScheduledTimestamp: &Timestamp1,
StartedTimestamp: &Timestamp2,
Queries: WorkflowQueryMap,
HistorySize: HistorySizeInBytes,
}
HistoryRefreshWorkflowTasksRequest = types.HistoryRefreshWorkflowTasksRequest{
DomainUIID: DomainID,
Expand Down
1 change: 1 addition & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ func CreateMatchingPollForDecisionTaskResponse(historyResponse *types.RecordDeci
ScheduledTimestamp: historyResponse.ScheduledTimestamp,
StartedTimestamp: historyResponse.StartedTimestamp,
Queries: historyResponse.Queries,
TotalHistoryBytes: historyResponse.HistorySize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expose historyCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the historyCount is equivalent to NextEventID - 1

}
if historyResponse.GetPreviousStartedEventID() != EmptyEventID {
matchingResp.PreviousStartedEventID = historyResponse.PreviousStartedEventID
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.1
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/cadence-idl v0.0.0-20230525234945-b6f203573446
github.com/uber/cadence-idl v0.0.0-20230905165949-03586319b849
github.com/uber/ringpop-go v0.8.5
github.com/uber/tchannel-go v1.22.2
github.com/urfave/cli v1.22.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20230525234945-b6f203573446 h1:Z2qOF3Eu3SEKgn9u6HVNkWozcp0MDm3F0P8ZwFKzxvA=
github.com/uber/cadence-idl v0.0.0-20230525234945-b6f203573446/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20230905165949-03586319b849 h1:j3bfADG1t35Lt4rNRpc9AuQ3l2cGw2Ao25Qt6rDamgc=
github.com/uber/cadence-idl v0.0.0-20230905165949-03586319b849/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
2 changes: 2 additions & 0 deletions proto/internal/uber/cadence/history/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ message GetMutableStateResponse {
api.v1.WorkflowExecutionCloseStatus workflow_close_state = 15;
shared.v1.VersionHistories version_histories = 16;
bool is_sticky_task_list_enabled = 17;
int64 history_size = 18;
}

message PollMutableStateRequest {
Expand Down Expand Up @@ -401,6 +402,7 @@ message RecordDecisionTaskStartedResponse {
google.protobuf.Timestamp scheduled_time = 12;
google.protobuf.Timestamp started_time = 13;
map<string, api.v1.WorkflowQuery> queries = 14;
int64 history_size = 15;
}

message RecordActivityTaskStartedRequest {
Expand Down
1 change: 1 addition & 0 deletions proto/internal/uber/cadence/matching/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ message PollForDecisionTaskResponse {
google.protobuf.Timestamp scheduled_time = 15;
google.protobuf.Timestamp started_time = 16;
map<string, api.v1.WorkflowQuery> queries = 17;
int64 total_history_bytes = 18;
}

message PollForActivityTaskRequest {
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/thriftHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestThriftHandler(t *testing.T) {
t.Run("PollForDecisionTask", func(t *testing.T) {
h.EXPECT().PollForDecisionTask(ctx, &types.PollForDecisionTaskRequest{}).Return(&types.PollForDecisionTaskResponse{}, internalErr).Times(1)
resp, err := th.PollForDecisionTask(ctx, &shared.PollForDecisionTaskRequest{})
assert.Equal(t, shared.PollForDecisionTaskResponse{StartedEventId: common.Int64Ptr(0), Attempt: common.Int64Ptr(0), BacklogCountHint: common.Int64Ptr(0), NextEventId: common.Int64Ptr(0)}, *resp)
assert.Equal(t, shared.PollForDecisionTaskResponse{StartedEventId: common.Int64Ptr(0), Attempt: common.Int64Ptr(0), BacklogCountHint: common.Int64Ptr(0), NextEventId: common.Int64Ptr(0), TotalHistoryBytes: common.Int64Ptr(0)}, *resp)
assert.Equal(t, expectedErr, err)
})
t.Run("QueryWorkflow", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4354,6 +4354,7 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
StartedTimestamp: matchingResp.StartedTimestamp,
Queries: matchingResp.Queries,
NextEventID: matchingResp.NextEventID,
TotalHistoryBytes: matchingResp.TotalHistoryBytes,
}

return resp, nil
Expand Down
1 change: 1 addition & 0 deletions service/history/decision/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ func (handler *handlerImpl) createRecordDecisionTaskStartedResponse(
queries[id] = input
}
response.Queries = queries
response.HistorySize = msBuilder.GetHistorySize()
return response, nil
}

Expand Down
4 changes: 4 additions & 0 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ func (c *contextImpl) GetHistorySize() int64 {

func (c *contextImpl) SetHistorySize(size int64) {
c.stats.HistorySize = size
if c.mutableState != nil {
c.mutableState.SetHistorySize(size)
}
}

func (c *contextImpl) LoadExecutionStats(
Expand Down Expand Up @@ -313,6 +316,7 @@ func (c *contextImpl) LoadWorkflowExecutionWithTaskVersion(
Message: "workflowExecutionContext counter flushBeforeReady status after loading mutable state from DB",
}
}

return c.mutableState, nil
}

Expand Down
3 changes: 3 additions & 0 deletions service/history/execution/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,8 @@ type (
StartTransaction(entry *cache.DomainCacheEntry, incomingTaskVersion int64) (bool, error)
CloseTransactionAsMutation(now time.Time, transactionPolicy TransactionPolicy) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error)
CloseTransactionAsSnapshot(now time.Time, transactionPolicy TransactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error)

GetHistorySize() int64
SetHistorySize(size int64)
}
)
12 changes: 12 additions & 0 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type (
logger log.Logger
metricsClient metrics.Client
pendingActivityWarningSent bool

executionStats *persistence.ExecutionStats
}
)

Expand Down Expand Up @@ -241,6 +243,7 @@ func newMutableStateBuilder(
s.taskGenerator = NewMutableStateTaskGenerator(shard.GetClusterMetadata(), shard.GetDomainCache(), s)
s.decisionTaskManager = newMutableStateDecisionTaskManager(s)

s.executionStats = &persistence.ExecutionStats{}
return s
}

Expand Down Expand Up @@ -333,6 +336,7 @@ func (e *mutableStateBuilder) Load(
// TODO: remove this after all 2DC workflows complete
e.replicationState = state.ReplicationState
e.checksum = state.Checksum
e.executionStats = state.ExecutionStats

e.fillForBackwardsCompatibility()

Expand Down Expand Up @@ -4194,6 +4198,14 @@ func (e *mutableStateBuilder) UpdateDuplicatedResource(
e.appliedEvents[id] = struct{}{}
}

func (e *mutableStateBuilder) GetHistorySize() int64 {
return e.executionStats.HistorySize
}

func (e *mutableStateBuilder) SetHistorySize(size int64) {
e.executionStats.HistorySize = size
}

func (e *mutableStateBuilder) prepareCloseTransaction(
transactionPolicy TransactionPolicy,
) error {
Expand Down
Loading