Skip to content

Commit

Permalink
Scheduled and stuck tasks are now resumed concurrently. (#140)
Browse files Browse the repository at this point in the history
* Scheduled and stuck tasks are now resumed concurrently.
  • Loading branch information
onukristo authored Dec 7, 2021
1 parent eff2fa3 commit a3e732a
Show file tree
Hide file tree
Showing 15 changed files with 258 additions and 65 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

Describes notable changes.

#### 1.30.0 - 2021/12/07
- Scheduled and stuck tasks are now resumed concurrently, by default with the parallelism of 10.
This eliminates a bottleneck for services relying on large volume of scheduled tasks.

#### 1.29.0 - 2021/05/31
- JDK 11+ is a requirement.
- Opensource facelift.
Expand Down
15 changes: 8 additions & 7 deletions build.libraries.gradle
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
ext {
twContextVersion = "0.11.0"
twLeaderSelectorVersion = "1.6.0"
springBootVersion = "2.3.11.RELEASE"
springBootVersion = "2.4.13"


libraries = [
// version defined
springBootDependencies : "org.springframework.boot:spring-boot-dependencies:${springBootVersion}",
apacheCuratorRecipies : "org.apache.curator:curator-recipes:5.1.0",
apacheCuratorRecipies : "org.apache.curator:curator-recipes:5.2.0",
apacheCommonsCollections : "org.apache.commons:commons-collections4:4.4",
commonsIo : "commons-io:commons-io:2.9.0",
guava : "com.google.guava:guava:30.1.1-jre",
commonsIo : "commons-io:commons-io:2.11.0",
guava : "com.google.guava:guava:31.0.1-jre",
semver4j : "com.vdurmont:semver4j:3.1.0",
twGracefulShutdown : "com.transferwise.common:tw-graceful-shutdown:2.3.0",
twGracefulShutdownIntefaces : "com.transferwise.common:tw-graceful-shutdown-interfaces:2.3.0",
Expand All @@ -20,13 +20,14 @@ ext {
twLeaderSelector : "com.transferwise.common:tw-leader-selector:${twLeaderSelectorVersion}",
twLeaderSelectorStarter : "com.transferwise.common:tw-leader-selector-starter:${twLeaderSelectorVersion}",
twBaseUtils : "com.transferwise.common:tw-base-utils:1.5.0",
spotbugsAnnotations : "com.github.spotbugs:spotbugs-annotations:4.2.3",
newRelic : "com.newrelic.agent.java:newrelic-api:6.5.0",
awaitility : 'org.awaitility:awaitility:4.1.0',
spotbugsAnnotations : "com.github.spotbugs:spotbugs-annotations:4.5.0",
newRelic : "com.newrelic.agent.java:newrelic-api:7.4.0",
awaitility : 'org.awaitility:awaitility:4.1.1',
gafferJta : 'com.transferwise.common:tw-gaffer-jta:1.4.0',
twEntryPointsStarter : 'com.transferwise.common:tw-entrypoints-starter:2.5.0',

// versions managed by spring-boot-dependencies platform
flywayCore : "org.flywaydb:flyway-core",
springKafka : "org.springframework.kafka:spring-kafka",
springTx : "org.springframework:spring-tx",
springJdbc : "org.springframework:spring-jdbc",
Expand Down
2 changes: 1 addition & 1 deletion demoapp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ dependencies {
runtimeOnly libraries.springBootStarterActuator
runtimeOnly libraries.micrometerRegistryPrometheus
runtimeOnly libraries.janino
runtimeOnly libraries.liquibaseCore
runtimeOnly libraries.twEntryPointsStarter
runtimeOnly libraries.flywayCore

// Database drivers
runtimeOnly libraries.mariadbJavaClient
Expand Down
2 changes: 1 addition & 1 deletion demoapp/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.7'
version: '3.3'

# Make sure you disable userland networking, for proper perf tests.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import javax.transaction.UserTransaction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.flyway.FlywayDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
Expand Down Expand Up @@ -61,6 +62,21 @@ public DataSource jtaDataSource() {
return da;
}

@Bean
@FlywayDataSource
public DataSource flywayDataSource() {
HikariDataSource hds = new HikariDataSource();
hds.setPoolName("demoapp_flyway");
hds.setJdbcUrl(env.getProperty("spring.datasource.url"));
hds.setUsername(env.getProperty("spring.flyway.user"));
hds.setPassword(env.getProperty("spring.flyway.password"));

hds.setMinimumIdle(0);
hds.setMaximumPoolSize(1);

return hds;
}

private DataSource dataSource() {
HikariDataSource hds = new HikariDataSource();
hds.setPoolName("demoapp");
Expand Down
16 changes: 10 additions & 6 deletions demoapp/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ spring:
enableAutoCommit: false
aop:
proxy-target-class: true
liquibase:
flyway:
enabled: true
change-log: 'classpath:db/changelog/db.tw-tasks-postgres.xml'
user: postgres
password: example-password-change-me
locations: classpath:db/changelog/postgres

tw-tasks:
core:
Expand All @@ -77,9 +79,11 @@ tw-tasks:
kafka:
bootstrap-servers: localhost:9092
#trigger-same-task-in-all-nodes: true
#tasksHistoryDeletingBatchSize: 500
#tasksCleaningInterval: PT0.5s
#finishedTasksHistoryToKeep: PT1M
tasksHistoryDeletingBatchSize: 500
tasksCleaningInterval: PT0.5s
finishedTasksHistoryToKeep: PT30M
taskGrabbingMaxConcurrency: 500
asyncTaskTriggering: false

zookeeper:
connect-string: localhost:2181
Expand Down Expand Up @@ -136,7 +140,7 @@ spring:
tw-tasks.core:
db-type: MYSQL

spring.liquibase.change-log: 'classpath:db/changelog/db.tw-tasks-mysql.xml'
spring.flyway.locations: classpath:db/changelog/mysql

---

Expand Down
35 changes: 35 additions & 0 deletions demoapp/src/main/resources/db/changelog/mysql/V1.0__initialize.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
CREATE TABLE tw_task
(
id BINARY(16) PRIMARY KEY NOT NULL,
status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'),
-- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1
next_event_time DATETIME(6) NOT NULL,
state_time DATETIME(3) NOT NULL,
version BIGINT NOT NULL,
priority INT NOT NULL DEFAULT 5,
processing_start_time DATETIME(3) NULL,
processing_tries_count BIGINT NOT NULL,
time_created DATETIME(3) NOT NULL,
time_updated DATETIME(3) NOT NULL,
type VARCHAR(250) CHARACTER SET latin1 NOT NULL,
sub_type VARCHAR(250) CHARACTER SET latin1 NULL,
processing_client_id VARCHAR(250) CHARACTER SET latin1 NULL,
data LONGTEXT NOT NULL
);

CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time);

CREATE TABLE tw_task_data
(
task_id BINARY(16) PRIMARY KEY NOT NULL,
data_format INT NOT NULL,
data LONGBLOB NOT NULL
);

CREATE TABLE unique_tw_task_key
(
task_id BINARY(16) PRIMARY KEY,
key_hash INT NOT NULL,
`key` VARCHAR(150) CHARACTER SET latin1 NOT NULL,
UNIQUE KEY uidx1 (key_hash, `key`)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE TABLE tw_task
(
id UUID PRIMARY KEY,
type TEXT NOT NULL,
sub_type TEXT NULL,
status TEXT NOT NULL,
data TEXT NOT NULL,
next_event_time TIMESTAMPTZ(6) NOT NULL,
state_time TIMESTAMPTZ(3) NOT NULL,
processing_client_id TEXT NULL,
processing_start_time TIMESTAMPTZ(3) NULL,
time_created TIMESTAMPTZ(3) NOT NULL,
time_updated TIMESTAMPTZ(3) NOT NULL,
processing_tries_count BIGINT NOT NULL,
version BIGINT NOT NULL,
priority INT NOT NULL DEFAULT 5
);

CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time);

CREATE TABLE tw_task_data
(
task_id UUID PRIMARY KEY NOT NULL,
data_format INT NOT NULL,
data BYTEA NOT NULL
);

ALTER TABLE tw_task_data ALTER COLUMN data SET STORAGE EXTERNAL;

CREATE TABLE unique_tw_task_key
(
task_id UUID PRIMARY KEY NOT NULL,
key_hash INT NOT NULL,
key TEXT NOT NULL,
unique (key_hash, key)
);
4 changes: 2 additions & 2 deletions docs/db-perf-tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ You can verify how long it took to execute all of them with following.

Mysql family:
```mysql
select TIMESTAMPDIFF(SECOND, min(time_created), max(state_time)) from tw_task;
select TIMESTAMPDIFF(SECOND, min(time_created), max(state_time)) from tw_task where type='DB_PERF_TEST';
```

Postgresql:
```postgresql
select EXTRACT(EPOCH FROM max(state_time)) - EXTRACT(EPOCH FROM min(tw_task.time_created)) from tw_task;
select EXTRACT(EPOCH FROM max(state_time)) - EXTRACT(EPOCH FROM min(tw_task.time_created)) from tw_task where type='DB_PERF_TEST';
```

#### Tips and tricks
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.29.1
version=1.30.0
org.gradle.internal.http.socketTimeout=120000
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.0.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -75,8 +76,9 @@ void setup() {
kafkaTasksExecutionTriggerer = (KafkaTasksExecutionTriggerer) tasksExecutionTriggerer;
}

@Test
void allUniqueTasksWillGetProcessed() throws Exception {
@ParameterizedTest(name = "allUniqueTasksWillGetProcessed({0})")
@ValueSource(booleans = {false, true})
void allUniqueTasksWillGetProcessed(boolean scheduled) throws Exception {
final int initialProcessingsCount = counterSum("twTasks.tasks.processingsCount");
final int initialProcessedCount = counterSum("twTasks.tasks.processedCount");
final int initialDuplicatesCount = counterSum("twTasks.tasks.duplicatesCount");
Expand All @@ -98,10 +100,16 @@ void allUniqueTasksWillGetProcessed() throws Exception {
final int key = i;
executorService.submit(() -> {
try {
tasksService.addTask(new AddTaskRequest()
var taskRequest = new AddTaskRequest()
.setData(taskDataSerializer.serialize("Hello World! " + key))
.setType("test")
.setUniqueKey(String.valueOf(key)));
.setUniqueKey(String.valueOf(key));

if (scheduled) {
taskRequest.setRunAfterTime(ZonedDateTime.now().plusSeconds(1));
}

tasksService.addTask(taskRequest);
} catch (Throwable t) {
log.error(t.getMessage(), t);
}
Expand Down Expand Up @@ -341,14 +349,14 @@ void taskProcessorHasCorrectContextSet() {
);

Task task = await().until(() -> transactionsHelper.withTransaction().asNew().call(
() -> {
try {
return testTasksService.getFinishedTasks("test", null);
} catch (Throwable t) {
log.error(t.getMessage(), t);
}
return null;
}),
() -> {
try {
return testTasksService.getFinishedTasks("test", null);
} catch (Throwable t) {
log.error(t.getMessage(), t);
}
return null;
}),
res -> {
if (res == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import lombok.Data;
import lombok.experimental.Accessors;

Expand Down Expand Up @@ -42,7 +43,7 @@ public class TasksProperties {
* How often do we try to clean very old tasks from the database.
*/
@NotNull
private Duration tasksCleaningInterval = Duration.ofSeconds(15);
private Duration tasksCleaningInterval = Duration.ofSeconds(1);
/**
* How often do we check if any scheduled task should be executed now.
*/
Expand Down Expand Up @@ -335,6 +336,9 @@ public enum DbType {
@Valid
private Triggering triggering = new Triggering();

@Valid
private TasksResumer tasksResumer = new TasksResumer();

@Data
public static class Triggering {

Expand Down Expand Up @@ -466,4 +470,18 @@ public static class Environment {
private String previousVersion;

}

@Data
@Accessors(chain = true)
public static class TasksResumer {

/**
* Specifies how many tasks we are loading from the database in one go to be then resumed concurrently.
*/
@Positive
private int batchSize = 1000;

@Positive
private int concurrency = 10;
}
}
Loading

0 comments on commit a3e732a

Please sign in to comment.