Skip to content

Commit

Permalink
Using a separate TwContextClockHolder for tw tasks and its tests. (#39)
Browse files Browse the repository at this point in the history
* Using a separate TwContextClockHolder for tw tasks and it's tests.
  • Loading branch information
onukristo authored Jun 30, 2020
1 parent c4a2aa4 commit 56f708b
Show file tree
Hide file tree
Showing 23 changed files with 86 additions and 62 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

Describes notable changes.

#### 1.7.5 - 2020/06/30
- Moving away from global ClockHolder to mock the time in tests.
In that way we will create less surprises and flakiness for services also needing to mock that global time
for other reasons.

#### 1.7.4 - 2020/06/29
- Reducing jobs logs spam in applications test suite.

Expand Down
2 changes: 1 addition & 1 deletion build.libraries.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ext {
twContextVersion = "0.1.2"
twContextVersion = "0.3.2"
springBootVersion = "2.2.4.RELEASE"

libraries = [
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.7.4
version=1.7.5
org.gradle.internal.http.socketTimeout=120000
1 change: 1 addition & 0 deletions integration-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
testImplementation libraries.commonsIo
testImplementation libraries.apacheCuratorRecipies
testImplementation libraries.twLeaderSelector
testImplementation libraries.twContext
testImplementation libraries.springTx
testImplementation 'org.springframework.boot:spring-boot'
testImplementation 'org.springframework.boot:spring-boot-autoconfigure'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.transferwise.tasks;

import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.baseutils.clock.TestClock;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.tasks.test.ITestTasksService;
import com.transferwise.tasks.testapp.IResultRegisteringSyncTaskProcessor;
import com.transferwise.tasks.testapp.TestTaskHandler;
Expand Down Expand Up @@ -58,7 +57,7 @@ void setApplicationContext(ApplicationContext applicationContext) {
@Autowired
protected MockMvc mockMvc; // use for testing secured endpoints

private long startTimeMs = ClockHolder.getClock().millis();
private long startTimeMs = System.currentTimeMillis();

@BeforeEach
void setupBaseTest(TestInfo testInfo) {
Expand All @@ -68,7 +67,7 @@ void setupBaseTest(TestInfo testInfo) {

@AfterEach
void cleanupBaseTest(TestInfo testInfo) {
TestClock.reset();
TwContextClockHolder.reset();
transactionsHelper.withTransaction().asNew().call(() -> {
testTasksService.reset();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.baseutils.clock.TestClock;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.domain.ITask;
Expand All @@ -16,7 +16,6 @@
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.concurrent.ThreadLocalRandom;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -116,7 +115,7 @@ static class JobA implements IJob {

@Override
public ZonedDateTime getNextRunTime() {
return ZonedDateTime.now(ClockHolder.getClock());
return ZonedDateTime.now(TwContextClockHolder.getClock());
}

@Override
Expand Down Expand Up @@ -151,7 +150,7 @@ public ZonedDateTime getRetryTime(ITask task, Throwable t) {
}
retryCode = 'F';
log.info("Next runtime will be after 1s");
return ZonedDateTime.now(ClockHolder.getClock()).plusSeconds(1);
return ZonedDateTime.now(TwContextClockHolder.getClock()).plusSeconds(1);
}
};
}
Expand All @@ -160,7 +159,7 @@ public ZonedDateTime getRetryTime(ITask task, Throwable t) {
public ZonedDateTime getNextRunTime() {
retryCode = 'R';
log.info("Next runtime will be after 60s");
return ZonedDateTime.now(ClockHolder.getClock()).plusSeconds(60);
return ZonedDateTime.now(TwContextClockHolder.getClock()).plusSeconds(60);
}

@Override
Expand All @@ -186,7 +185,7 @@ static class JobC implements IJob {

@Override
public ZonedDateTime getNextRunTime() {
return ZonedDateTime.now(ClockHolder.getClock()).plusYears(100);
return ZonedDateTime.now(TwContextClockHolder.getClock()).plusYears(100);
}

@Override
Expand All @@ -196,12 +195,15 @@ public ProcessResult process(ITask task) {
}
}

@RequiredArgsConstructor
private class IntermediateRetriesTestSetup {

final TestClock clock = TestClock.createAndRegister();
final TestClock clock = new TestClock();
final JobB job = new JobB();

public IntermediateRetriesTestSetup() {
TwContextClockHolder.setClock(clock);
}

void waitForExecutionCount(int expectedCount) {
await().until(
() -> job.executionsCount == expectedCount && testTasksService.getWaitingTasks("TaskJob|MyJobB", null).size() > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.clock.TestClock;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.ITasksService;
import com.transferwise.tasks.ITasksService.TasksProcessingState;
Expand Down Expand Up @@ -37,7 +38,7 @@ class ManualStartIntTest extends BaseIntTest {

@BeforeEach
void setup() {
testClock = TestClock.createAndRegister();
TwContextClockHolder.setClock(testClock = new TestClock());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.baseutils.clock.TestClock;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.ITasksService;
import com.transferwise.tasks.domain.IBaseTask;
Expand Down Expand Up @@ -151,7 +152,9 @@ void after5RetriesAsynchronousTasksGoToErrorState() {

@Test
void exponentialRetriesWorkEachRetryShouldTakeLonger() {
TestClock clock = TestClock.createAndRegister();
TestClock clock = new TestClock();
TwContextClockHolder.setClock(clock);

AtomicInteger processingCount = new AtomicInteger();
int n = 5;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.ITasksService;
import com.transferwise.tasks.TaskTestBuilder;
Expand Down Expand Up @@ -203,7 +204,7 @@ void immediatelyResumingATaskWorks() {
FullTaskRecord task = taskDao.getTask(task0Id, FullTaskRecord.class);
assertEquals(TaskStatus.WAITING.name(), task.getStatus());
// decrementing 1 because of database rounding error.
assertFalse(task.getNextEventTime().isAfter(ZonedDateTime.now(ClockHolder.getClock()).plusSeconds(1)));
assertFalse(task.getNextEventTime().isAfter(ZonedDateTime.now(TwContextClockHolder.getClock()).plusSeconds(1)));
}

@Test
Expand All @@ -227,7 +228,7 @@ void immediatelyResumingAllTasksWorks() {
assertTrue(response.getBody().getResults().get(task0Id).isSuccess());
FullTaskRecord task = taskDao.getTask(task0Id, FullTaskRecord.class);
// decrementing 1 because of database rounding error
assertFalse(task.getNextEventTime().isAfter(ZonedDateTime.now(ClockHolder.getClock()).plusSeconds(1)));
assertFalse(task.getNextEventTime().isAfter(ZonedDateTime.now(TwContextClockHolder.getClock()).plusSeconds(1)));
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.transferwise.tasks.testapp.config;

import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.tasks.buckets.BucketProperties;
import com.transferwise.tasks.buckets.IBucketsManager;
import com.transferwise.tasks.config.TwTasksKafkaConfiguration;
Expand Down Expand Up @@ -111,7 +112,7 @@ public IJob myFancyJobA() {
return new IJob() {
@Override
public ZonedDateTime getNextRunTime() {
return ZonedDateTime.now(ClockHolder.getClock()).plusYears(20);
return ZonedDateTime.now(TwContextClockHolder.getClock()).plusYears(20);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableSet;
import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.baseutils.clock.TestClock;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.dao.ITaskDao.DaoTask1;
Expand Down Expand Up @@ -232,7 +233,8 @@ void schedulingTaskForImmediateExecutionPutsTheTaskInWaitingState() {

@Test
void getStuckTasksReturnsAllTasksToRetry() {
final TestClock testClock = TestClock.createAndRegister();
final TestClock testClock = new TestClock();
TwContextClockHolder.setClock(testClock);
final UUID taskId = UUID.randomUUID();
addTask(taskId, TaskStatus.SUBMITTED);
final ZonedDateTime oldNextEventTime = taskDao.getTask(taskId, FullTaskRecord.class).getNextEventTime();
Expand Down Expand Up @@ -399,7 +401,8 @@ void gettingStuckTaskCountConsidersTheCorrectStatus() {

@Test
void gettingStuckTaskCountConsidersTheLimitTime() {
TestClock testClock = TestClock.createAndRegister();
final TestClock testClock = new TestClock();
TwContextClockHolder.setClock(testClock);
addRandomTask(TaskStatus.PROCESSING);
addRandomTask(TaskStatus.PROCESSING);
ZonedDateTime limitTime = ZonedDateTime.now(testClock).plus(1, ChronoUnit.MILLIS);
Expand Down Expand Up @@ -471,12 +474,13 @@ void deletingTasksByTypeAndStatusDeletesTheCorrectTasks() {

@Test
void deletingOldTasksIsHappeningInBatches() {
TestClock clock = TestClock.createAndRegister();
final TestClock testClock = new TestClock();
TwContextClockHolder.setClock(testClock);

addRandomTask();
addRandomTask();

clock.tick(Duration.ofMinutes(11));
testClock.tick(Duration.ofMinutes(11));
DeleteFinishedOldTasksResult result = taskDao.deleteOldTasks(TaskStatus.DONE, Duration.ofMinutes(10), 1);

assertEquals(1, taskDao.getTasksCountInStatus(10, TaskStatus.DONE));
Expand Down Expand Up @@ -654,7 +658,7 @@ private InsertTaskResponse addTask(
return taskDao.insertTask(
new ITaskDao.InsertTaskRequest()
.setData(data)
.setMaxStuckTime(ZonedDateTime.now(ClockHolder.getClock()))
.setMaxStuckTime(ZonedDateTime.now(TwContextClockHolder.getClock()))
.setTaskId(id)
.setPriority(5)
.setStatus(taskStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.baseutils.tracing.IWithXRequestId;
import com.transferwise.common.baseutils.tracing.IXRequestIdHolder;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.domain.BaseTask;
Expand Down Expand Up @@ -75,7 +75,7 @@ public void init() {
public AddTaskResponse addTask(AddTaskRequest request) {
return MdcContext.with(() -> {
MdcContext.put(tasksProperties.getTwTaskVersionIdMdcKey(), new TaskVersionId(request.getTaskId(), 0));
ZonedDateTime now = ZonedDateTime.now(ClockHolder.getClock());
ZonedDateTime now = ZonedDateTime.now(TwContextClockHolder.getClock());
TaskStatus status = request.getRunAfterTime() == null || !request.getRunAfterTime().isAfter(now) ? TaskStatus.SUBMITTED : TaskStatus.WAITING;

int priority = priorityManager.normalize(request.getPriority());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static com.transferwise.tasks.utils.TimeUtils.toZonedDateTime;
import static com.transferwise.tasks.utils.UuidUtils.toUuid;

import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.domain.BaseTask;
import com.transferwise.tasks.domain.BaseTask1;
Expand Down Expand Up @@ -162,7 +162,7 @@ public void init() {
@Override
@Transactional(rollbackFor = Exception.class)
public InsertTaskResponse insertTask(InsertTaskRequest request) {
Timestamp now = Timestamp.from(Instant.now(ClockHolder.getClock()));
Timestamp now = Timestamp.from(Instant.now(TwContextClockHolder.getClock()));
ZonedDateTime nextEventTime = request.getRunAfterTime() == null ? request.getMaxStuckTime() : request.getRunAfterTime();

boolean uuidProvided = request.getTaskId() != null;
Expand Down Expand Up @@ -193,7 +193,7 @@ public InsertTaskResponse insertTask(InsertTaskRequest request) {
@Override
@Transactional(rollbackFor = Exception.class)
public boolean setToBeRetried(UUID taskId, ZonedDateTime retryTime, long version, boolean resetTriesCount) {
Timestamp now = Timestamp.from(Instant.now(ClockHolder.getClock()));
Timestamp now = Timestamp.from(Instant.now(TwContextClockHolder.getClock()));

int updatedCount;
if (resetTriesCount) {
Expand All @@ -209,7 +209,7 @@ public boolean setToBeRetried(UUID taskId, ZonedDateTime retryTime, long version
@Override
@Transactional(rollbackFor = Exception.class)
public Task grabForProcessing(BaseTask task, String clientId, Instant maxProcessingEndTime) {
Timestamp now = Timestamp.from(Instant.now(ClockHolder.getClock()));
Timestamp now = Timestamp.from(Instant.now(TwContextClockHolder.getClock()));

int updatedCount = jdbcTemplate.update(grabForProcessingSql, args(clientId, TaskStatus.PROCESSING, now,
maxProcessingEndTime, now, now, task.getVersion() + 1, task.getId(), task.getVersion(), TaskStatus.SUBMITTED));
Expand All @@ -222,23 +222,23 @@ public Task grabForProcessing(BaseTask task, String clientId, Instant maxProcess
@Override
@Transactional(rollbackFor = Exception.class)
public boolean setStatus(UUID taskId, TaskStatus status, long version) {
Timestamp now = Timestamp.from(Instant.now(ClockHolder.getClock()));
Timestamp now = Timestamp.from(Instant.now(TwContextClockHolder.getClock()));
int updatedCount = jdbcTemplate.update(setStatusSql, args(status, now, now, version + 1, taskId, version));
return updatedCount == 1;
}

@Override
@Transactional(rollbackFor = Exception.class)
public boolean scheduleTaskForImmediateExecution(UUID taskId, long version) {
Timestamp now = Timestamp.from(Instant.now(ClockHolder.getClock()));
Timestamp now = Timestamp.from(Instant.now(TwContextClockHolder.getClock()));
int updatedCount = jdbcTemplate.update(scheduleTaskForImmediateExecutionSql, args(TaskStatus.WAITING,
now, now, now, version + 1, taskId, version));
return updatedCount == 1;
}

@Override
public GetStuckTasksResponse getStuckTasks(int batchSize, TaskStatus... statuses) {
Timestamp now = Timestamp.from(ZonedDateTime.now(ClockHolder.getClock()).toInstant());
Timestamp now = Timestamp.from(ZonedDateTime.now(TwContextClockHolder.getClock()).toInstant());

String sql = cachedSql(sqlKey("getStuckTasksSql", statuses.length), () ->
getExpandedSql(getStuckTasksSql, statuses.length));
Expand All @@ -260,7 +260,7 @@ public GetStuckTasksResponse getStuckTasks(int batchSize, TaskStatus... statuses
//TODO: Will not perform well on MySQL. See #getTasksInProcessingOrWaitingStatus
//TODO: Should do UNION all here for better performance.
public List<DaoTask2> getStuckTasks(int maxCount, Duration delta) {
Timestamp timeThreshold = Timestamp.from(ZonedDateTime.now(ClockHolder.getClock()).toInstant().minus(delta));
Timestamp timeThreshold = Timestamp.from(ZonedDateTime.now(TwContextClockHolder.getClock()).toInstant().minus(delta));
return jdbcTemplate.query(getStuckTasksSql1, args(timeThreshold, maxCount), (rs, rowNum) -> new DaoTask2()
.setId(toUuid(rs.getObject(1))).setVersion(rs.getLong(2))
.setNextEventTime(TimeUtils.toZonedDateTime(rs.getTimestamp(3))));
Expand All @@ -275,7 +275,7 @@ public ZonedDateTime getEarliestTaskNextEventTime(TaskStatus status) {
@Override
@Transactional(rollbackFor = Exception.class)
public List<StuckTask> prepareStuckOnProcessingTasksForResuming(String clientId, ZonedDateTime maxStuckTime) {
Timestamp now = Timestamp.from(ZonedDateTime.now(ClockHolder.getClock()).toInstant());
Timestamp now = Timestamp.from(ZonedDateTime.now(TwContextClockHolder.getClock()).toInstant());
List<StuckTask> result = new ArrayList<>();

jdbcTemplate.query(prepareStuckOnProcessingTaskForResumingSql, args(TaskStatus.PROCESSING, clientId),
Expand All @@ -297,7 +297,7 @@ public List<StuckTask> prepareStuckOnProcessingTasksForResuming(String clientId,
@Override
@Transactional(rollbackFor = Exception.class)
public boolean markAsSubmitted(UUID taskId, long version, ZonedDateTime maxStuckTime) {
Timestamp now = Timestamp.from(Instant.now(ClockHolder.getClock()));
Timestamp now = Timestamp.from(Instant.now(TwContextClockHolder.getClock()));

int updatedCount = jdbcTemplate.update(setStatusSql1, args(TaskStatus.SUBMITTED, maxStuckTime,
now, now, version + 1, taskId, version));
Expand Down Expand Up @@ -478,7 +478,7 @@ public int getBatchSize() {
@Transactional(rollbackFor = Exception.class)
public DeleteFinishedOldTasksResult deleteOldTasks(TaskStatus taskStatus, Duration age, int batchSize) {
DeleteFinishedOldTasksResult result = new DeleteFinishedOldTasksResult();
Timestamp deletedBeforeTime = Timestamp.from(Instant.now(ClockHolder.getClock()).minus(age));
Timestamp deletedBeforeTime = Timestamp.from(Instant.now(TwContextClockHolder.getClock()).minus(age));

List<Pair<Object, Long>> taskVersionIds = jdbcTemplate.query(deleteFinishedOldTasksSql,
args(taskStatus.name(), deletedBeforeTime, batchSize),
Expand Down Expand Up @@ -572,7 +572,7 @@ public List<DaoTask3> getTasksInProcessingOrWaitingStatus(int maxCount) {
@Override
@Transactional(rollbackFor = Exception.class)
public boolean clearPayloadAndMarkDone(UUID taskId, long version) {
Timestamp now = Timestamp.from(Instant.now(ClockHolder.getClock()));
Timestamp now = Timestamp.from(Instant.now(TwContextClockHolder.getClock()));
int updatedCount = jdbcTemplate.update(clearPayloadAndMarkDoneSql, args(TaskStatus.DONE, now, now, version + 1,
taskId, version));

Expand Down
Loading

0 comments on commit 56f708b

Please sign in to comment.