Skip to content

Commit

Permalink
Make matching persistence range queries [inclusive, exclusive) (#2599)
Browse files Browse the repository at this point in the history
- Range for matching GetTasks is now [inclusive, exclusive), instead of (exclusive, inclusive]
- Range for matching CompleteTasksLessThan is now < max TaskID, instead of <=
- Range for admin ListTaskQueueTasks is now [inclusive, exclusive)
  • Loading branch information
yycptt authored Mar 15, 2022
1 parent 9732f77 commit bb7b984
Show file tree
Hide file tree
Showing 25 changed files with 193 additions and 221 deletions.
14 changes: 7 additions & 7 deletions common/persistence/cassandra/matching_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ const (
`and task_queue_name = ? ` +
`and task_queue_type = ? ` +
`and type = ? ` +
`and task_id > ? ` +
`and task_id <= ?`
`and task_id >= ? ` +
`and task_id < ?`

templateCompleteTaskQuery = `DELETE FROM tasks ` +
`WHERE namespace_id = ? ` +
Expand All @@ -69,7 +69,7 @@ const (
`AND task_queue_name = ? ` +
`AND task_queue_type = ? ` +
`AND type = ? ` +
`AND task_id <= ? `
`AND task_id < ? `

templateGetTaskQueueQuery = `SELECT ` +
`range_id, ` +
Expand Down Expand Up @@ -388,8 +388,8 @@ func (d *MatchingTaskStore) GetTasks(
request.TaskQueue,
request.TaskType,
rowTypeTask,
request.MinTaskIDExclusive,
request.MaxTaskIDInclusive,
request.InclusiveMinTaskID,
request.ExclusiveMaxTaskID,
)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()

Expand Down Expand Up @@ -455,14 +455,14 @@ func (d *MatchingTaskStore) CompleteTask(
return nil
}

// CompleteTasksLessThan deletes all tasks less than or equal to the given task id. This API ignores the
// CompleteTasksLessThan deletes all tasks less than the given task id. This API ignores the
// Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will
// be returned to the caller
func (d *MatchingTaskStore) CompleteTasksLessThan(
request *p.CompleteTasksLessThanRequest,
) (int, error) {
query := d.Session.Query(templateCompleteTasksLessThanQuery,
request.NamespaceID, request.TaskQueueName, request.TaskType, rowTypeTask, request.TaskID)
request.NamespaceID, request.TaskQueueName, request.TaskType, rowTypeTask, request.ExclusiveMaxTaskID)
err := query.Exec()
if err != nil {
return 0, gocql.ConvertError("CompleteTasksLessThan", err)
Expand Down
14 changes: 7 additions & 7 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ type (
NamespaceID string
TaskQueue string
TaskType enumspb.TaskQueueType
MinTaskIDExclusive int64 // exclusive
MaxTaskIDInclusive int64 // inclusive
InclusiveMinTaskID int64
ExclusiveMaxTaskID int64
PageSize int
NextPageToken []byte
}
Expand All @@ -585,11 +585,11 @@ type (

// CompleteTasksLessThanRequest contains the request params needed to invoke CompleteTasksLessThan API
CompleteTasksLessThanRequest struct {
NamespaceID string
TaskQueueName string
TaskType enumspb.TaskQueueType
TaskID int64 // Tasks less than or equal to this ID will be completed
Limit int // Limit on the max number of tasks that can be completed. Required param
NamespaceID string
TaskQueueName string
TaskType enumspb.TaskQueueType
ExclusiveMaxTaskID int64 // Tasks less than this ID will be completed
Limit int // Limit on the max number of tasks that can be completed. Required param
}

// CreateNamespaceRequest is used to create the namespace
Expand Down
29 changes: 0 additions & 29 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,35 +1241,6 @@ func (s *TestBase) RangeCompleteTimerTask(inclusiveBeginTimestamp time.Time, exc
})
}

// GetTasks is a utility method to get tasks from persistence
func (s *TestBase) GetTasks(namespaceID string, taskQueue string, taskType enumspb.TaskQueueType, batchSize int) (*persistence.GetTasksResponse, error) {
response, err := s.TaskMgr.GetTasks(&persistence.GetTasksRequest{
NamespaceID: namespaceID,
TaskQueue: taskQueue,
TaskType: taskType,
PageSize: batchSize,
MaxTaskIDInclusive: math.MaxInt64,
})

if err != nil {
return nil, err
}

return &persistence.GetTasksResponse{Tasks: response.Tasks}, nil
}

// CompleteTask is a utility method to complete a task
func (s *TestBase) CompleteTask(namespaceID string, taskQueue string, taskType enumspb.TaskQueueType, taskID int64) error {
return s.TaskMgr.CompleteTask(&persistence.CompleteTaskRequest{
TaskQueue: &persistence.TaskQueueKey{
NamespaceID: namespaceID,
TaskQueueType: taskType,
TaskQueueName: taskQueue,
},
TaskID: taskID,
})
}

// TearDownWorkflowStore to cleanup
func (s *TestBase) TearDownWorkflowStore() {
s.TaskMgr.Close()
Expand Down
21 changes: 10 additions & 11 deletions common/persistence/sql/sqlplugin/matching_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,28 @@ type (
// TasksFilter contains the column names within tasks table that
// can be used to filter results through a WHERE clause
TasksFilter struct {
RangeHash uint32
TaskQueueID []byte
TaskID *int64
MinTaskID *int64
MaxTaskID *int64
TaskIDLessThanEquals *int64
Limit *int
PageSize *int
RangeHash uint32
TaskQueueID []byte
TaskID *int64
InclusiveMinTaskID *int64
ExclusiveMaxTaskID *int64
Limit *int
PageSize *int
}

// MatchingTask is the SQL persistence interface for matching tasks
MatchingTask interface {
InsertIntoTasks(ctx context.Context, rows []TasksRow) (sql.Result, error)
// SelectFromTasks retrieves one or more rows from the tasks table
// Required filter params - {namespaceID, taskqueueName, taskType, minTaskID, maxTaskID, pageSize}
// Required filter params - {namespaceID, taskqueueName, taskType, inclusiveMinTaskID, exclusiveMaxTaskID, pageSize}
SelectFromTasks(ctx context.Context, filter TasksFilter) ([]TasksRow, error)
// DeleteFromTasks deletes a row from tasks table
// Required filter params:
// to delete single row
// - {namespaceID, taskqueueName, taskType, taskID}
// to delete multiple rows
// - {namespaceID, taskqueueName, taskType, taskIDLessThanEquals, limit }
// - this will delete upto limit number of tasks less than or equal to the given task id
// - {namespaceID, taskqueueName, taskType, exclusiveMaxTaskID, limit }
// - this will delete upto limit number of tasks less than the given max task id
DeleteFromTasks(ctx context.Context, filter TasksFilter) (sql.Result, error)
}
)
18 changes: 9 additions & 9 deletions common/persistence/sql/sqlplugin/mysql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ task_queue_id = :task_queue_id
// *** Tasks Below ***
getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` +
`FROM tasks ` +
`WHERE range_hash = ? AND task_queue_id = ? AND task_id > ? AND task_id <= ? ` +
`WHERE range_hash = ? AND task_queue_id = ? AND task_id >= ? AND task_id < ? ` +
` ORDER BY task_id LIMIT ?`

getTaskMinQry = `SELECT task_id, data, data_encoding ` +
`FROM tasks ` +
`WHERE range_hash = ? AND task_queue_id = ? AND task_id > ? ORDER BY task_id LIMIT ?`
`WHERE range_hash = ? AND task_queue_id = ? AND task_id >= ? ORDER BY task_id LIMIT ?`

createTaskQry = `INSERT INTO ` +
`tasks(range_hash, task_queue_id, task_id, data, data_encoding) ` +
Expand All @@ -85,7 +85,7 @@ task_queue_id = :task_queue_id
`WHERE range_hash = ? AND task_queue_id = ? AND task_id = ?`

rangeDeleteTaskQry = `DELETE FROM tasks ` +
`WHERE range_hash = ? AND task_queue_id = ? AND task_id <= ? ` +
`WHERE range_hash = ? AND task_queue_id = ? AND task_id < ? ` +
`ORDER BY task_queue_id,task_id LIMIT ?`
)

Expand All @@ -108,21 +108,21 @@ func (mdb *db) SelectFromTasks(
var err error
var rows []sqlplugin.TasksRow
switch {
case filter.MaxTaskID != nil:
case filter.ExclusiveMaxTaskID != nil:
err = mdb.conn.SelectContext(ctx,
&rows, getTaskMinMaxQry,
filter.RangeHash,
filter.TaskQueueID,
*filter.MinTaskID,
*filter.MaxTaskID,
*filter.InclusiveMinTaskID,
*filter.ExclusiveMaxTaskID,
*filter.PageSize,
)
default:
err = mdb.conn.SelectContext(ctx,
&rows, getTaskMinQry,
filter.RangeHash,
filter.TaskQueueID,
*filter.MinTaskID,
*filter.InclusiveMinTaskID,
*filter.PageSize,
)
}
Expand All @@ -137,15 +137,15 @@ func (mdb *db) DeleteFromTasks(
ctx context.Context,
filter sqlplugin.TasksFilter,
) (sql.Result, error) {
if filter.TaskIDLessThanEquals != nil {
if filter.ExclusiveMaxTaskID != nil {
if filter.Limit == nil || *filter.Limit == 0 {
return nil, fmt.Errorf("missing limit parameter")
}
return mdb.conn.ExecContext(ctx,
rangeDeleteTaskQry,
filter.RangeHash,
filter.TaskQueueID,
*filter.TaskIDLessThanEquals,
*filter.ExclusiveMaxTaskID,
*filter.Limit,
)
}
Expand Down
18 changes: 9 additions & 9 deletions common/persistence/sql/sqlplugin/postgresql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ task_queue_id = :task_queue_id
// *** Tasks Below ***
getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` +
`FROM tasks ` +
`WHERE range_hash = $1 AND task_queue_id=$2 AND task_id > $3 AND task_id <= $4 ` +
`WHERE range_hash = $1 AND task_queue_id=$2 AND task_id >= $3 AND task_id < $4 ` +
`ORDER BY task_id LIMIT $5`

getTaskMinQry = `SELECT task_id, data, data_encoding ` +
`FROM tasks ` +
`WHERE range_hash = $1 AND task_queue_id = $2 AND task_id > $3 ORDER BY task_id LIMIT $4`
`WHERE range_hash = $1 AND task_queue_id = $2 AND task_id >= $3 ORDER BY task_id LIMIT $4`

createTaskQry = `INSERT INTO ` +
`tasks(range_hash, task_queue_id, task_id, data, data_encoding) ` +
Expand All @@ -86,7 +86,7 @@ task_queue_id = :task_queue_id

rangeDeleteTaskQry = `DELETE FROM tasks ` +
`WHERE range_hash = $1 AND task_queue_id = $2 AND task_id IN (SELECT task_id FROM
tasks WHERE range_hash = $1 AND task_queue_id = $2 AND task_id <= $3 ` +
tasks WHERE range_hash = $1 AND task_queue_id = $2 AND task_id < $3 ` +
`ORDER BY task_queue_id,task_id LIMIT $4 )`
)

Expand All @@ -109,14 +109,14 @@ func (pdb *db) SelectFromTasks(
var err error
var rows []sqlplugin.TasksRow
switch {
case filter.MaxTaskID != nil:
case filter.ExclusiveMaxTaskID != nil:
err = pdb.conn.SelectContext(ctx,
&rows,
getTaskMinMaxQry,
filter.RangeHash,
filter.TaskQueueID,
*filter.MinTaskID,
*filter.MaxTaskID,
*filter.InclusiveMinTaskID,
*filter.ExclusiveMaxTaskID,
*filter.PageSize,
)
default:
Expand All @@ -125,7 +125,7 @@ func (pdb *db) SelectFromTasks(
getTaskMinQry,
filter.RangeHash,
filter.TaskQueueID,
*filter.MinTaskID,
*filter.InclusiveMinTaskID,
*filter.PageSize,
)
}
Expand All @@ -137,15 +137,15 @@ func (pdb *db) DeleteFromTasks(
ctx context.Context,
filter sqlplugin.TasksFilter,
) (sql.Result, error) {
if filter.TaskIDLessThanEquals != nil {
if filter.ExclusiveMaxTaskID != nil {
if filter.Limit == nil || *filter.Limit == 0 {
return nil, fmt.Errorf("missing limit parameter")
}
return pdb.conn.ExecContext(ctx,
rangeDeleteTaskQry,
filter.RangeHash,
filter.TaskQueueID,
*filter.TaskIDLessThanEquals,
*filter.ExclusiveMaxTaskID,
*filter.Limit,
)
}
Expand Down
18 changes: 9 additions & 9 deletions common/persistence/sql/sqlplugin/sqlite/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ task_queue_id = :task_queue_id
// *** Tasks Below ***
getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` +
`FROM tasks ` +
`WHERE range_hash = ? AND task_queue_id = ? AND task_id > ? AND task_id <= ? ` +
`WHERE range_hash = ? AND task_queue_id = ? AND task_id >= ? AND task_id < ? ` +
` ORDER BY task_id LIMIT ?`

getTaskMinQry = `SELECT task_id, data, data_encoding ` +
`FROM tasks ` +
`WHERE range_hash = ? AND task_queue_id = ? AND task_id > ? ORDER BY task_id LIMIT ?`
`WHERE range_hash = ? AND task_queue_id = ? AND task_id >= ? ORDER BY task_id LIMIT ?`

createTaskQry = `INSERT INTO ` +
`tasks(range_hash, task_queue_id, task_id, data, data_encoding) ` +
Expand All @@ -90,7 +90,7 @@ task_queue_id = :task_queue_id

rangeDeleteTaskQry = `DELETE FROM tasks ` +
`WHERE range_hash = ? AND task_queue_id = ? AND task_id IN (SELECT task_id FROM
tasks WHERE range_hash = ? AND task_queue_id = ? AND task_id <= ? ` +
tasks WHERE range_hash = ? AND task_queue_id = ? AND task_id < ? ` +
`ORDER BY task_queue_id,task_id LIMIT ? ) `
)

Expand All @@ -113,21 +113,21 @@ func (mdb *db) SelectFromTasks(
var err error
var rows []sqlplugin.TasksRow
switch {
case filter.MaxTaskID != nil:
case filter.ExclusiveMaxTaskID != nil:
err = mdb.conn.SelectContext(ctx,
&rows, getTaskMinMaxQry,
filter.RangeHash,
filter.TaskQueueID,
*filter.MinTaskID,
*filter.MaxTaskID,
*filter.InclusiveMinTaskID,
*filter.ExclusiveMaxTaskID,
*filter.PageSize,
)
default:
err = mdb.conn.SelectContext(ctx,
&rows, getTaskMinQry,
filter.RangeHash,
filter.TaskQueueID,
*filter.MinTaskID,
*filter.ExclusiveMaxTaskID,
*filter.PageSize,
)
}
Expand All @@ -142,7 +142,7 @@ func (mdb *db) DeleteFromTasks(
ctx context.Context,
filter sqlplugin.TasksFilter,
) (sql.Result, error) {
if filter.TaskIDLessThanEquals != nil {
if filter.ExclusiveMaxTaskID != nil {
if filter.Limit == nil || *filter.Limit == 0 {
return nil, fmt.Errorf("missing limit parameter")
}
Expand All @@ -152,7 +152,7 @@ func (mdb *db) DeleteFromTasks(
filter.TaskQueueID,
filter.RangeHash,
filter.TaskQueueID,
*filter.TaskIDLessThanEquals,
*filter.ExclusiveMaxTaskID,
*filter.Limit,
)
}
Expand Down
Loading

0 comments on commit bb7b984

Please sign in to comment.