Skip to content

Commit

Permalink
Unify history task persistence range query behavior (#2547)
Browse files Browse the repository at this point in the history
- Make range based on taskID and task fire time both [inclusive, exclusive)
  • Loading branch information
yycptt authored Feb 28, 2022
1 parent b431869 commit c3e7dbe
Show file tree
Hide file tree
Showing 36 changed files with 642 additions and 636 deletions.
95 changes: 94 additions & 1 deletion common/persistence/cassandra/matching_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,99 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
)

const (
templateCreateTaskQuery = `INSERT INTO tasks (` +
`namespace_id, task_queue_name, task_queue_type, type, task_id, task, task_encoding) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?)`

templateCreateTaskWithTTLQuery = `INSERT INTO tasks (` +
`namespace_id, task_queue_name, task_queue_type, type, task_id, task, task_encoding) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?) USING TTL ?`

templateGetTasksQuery = `SELECT task_id, task, task_encoding ` +
`FROM tasks ` +
`WHERE namespace_id = ? ` +
`and task_queue_name = ? ` +
`and task_queue_type = ? ` +
`and type = ? ` +
`and task_id > ? ` +
`and task_id <= ?`

templateCompleteTaskQuery = `DELETE FROM tasks ` +
`WHERE namespace_id = ? ` +
`and task_queue_name = ? ` +
`and task_queue_type = ? ` +
`and type = ? ` +
`and task_id = ?`

templateCompleteTasksLessThanQuery = `DELETE FROM tasks ` +
`WHERE namespace_id = ? ` +
`AND task_queue_name = ? ` +
`AND task_queue_type = ? ` +
`AND type = ? ` +
`AND task_id <= ? `

templateGetTaskQueueQuery = `SELECT ` +
`range_id, ` +
`task_queue, ` +
`task_queue_encoding ` +
`FROM tasks ` +
`WHERE namespace_id = ? ` +
`and task_queue_name = ? ` +
`and task_queue_type = ? ` +
`and type = ? ` +
`and task_id = ?`

templateInsertTaskQueueQuery = `INSERT INTO tasks (` +
`namespace_id, ` +
`task_queue_name, ` +
`task_queue_type, ` +
`type, ` +
`task_id, ` +
`range_id, ` +
`task_queue, ` +
`task_queue_encoding ` +
`) VALUES (?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS`

templateUpdateTaskQueueQuery = `UPDATE tasks SET ` +
`range_id = ?, ` +
`task_queue = ?, ` +
`task_queue_encoding = ? ` +
`WHERE namespace_id = ? ` +
`and task_queue_name = ? ` +
`and task_queue_type = ? ` +
`and type = ? ` +
`and task_id = ? ` +
`IF range_id = ?`

templateUpdateTaskQueueQueryWithTTLPart1 = `INSERT INTO tasks (` +
`namespace_id, ` +
`task_queue_name, ` +
`task_queue_type, ` +
`type, ` +
`task_id ` +
`) VALUES (?, ?, ?, ?, ?) USING TTL ?`

templateUpdateTaskQueueQueryWithTTLPart2 = `UPDATE tasks USING TTL ? SET ` +
`range_id = ?, ` +
`task_queue = ?, ` +
`task_queue_encoding = ? ` +
`WHERE namespace_id = ? ` +
`and task_queue_name = ? ` +
`and task_queue_type = ? ` +
`and type = ? ` +
`and task_id = ? ` +
`IF range_id = ?`

templateDeleteTaskQueueQuery = `DELETE FROM tasks ` +
`WHERE namespace_id = ? ` +
`AND task_queue_name = ? ` +
`AND task_queue_type = ? ` +
`AND type = ? ` +
`AND task_id = ? ` +
`IF range_id = ?`
)

type (
MatchingTaskStore struct {
Session gocql.Session
Expand Down Expand Up @@ -90,7 +183,7 @@ func (d *MatchingTaskStore) CreateTaskQueue(
func (d *MatchingTaskStore) GetTaskQueue(
request *p.InternalGetTaskQueueRequest,
) (*p.InternalGetTaskQueueResponse, error) {
query := d.Session.Query(templateGetTaskQueue,
query := d.Session.Query(templateGetTaskQueueQuery,
request.NamespaceID,
request.TaskQueue,
request.TaskType,
Expand Down
Loading

0 comments on commit c3e7dbe

Please sign in to comment.