Skip to content

Commit

Permalink
Unhooking core from Spring Kafka. (#98)
Browse files Browse the repository at this point in the history
* Unhooking core from Spring Kafka.
  • Loading branch information
onukristo authored Feb 19, 2021
1 parent 723f2c7 commit a8e7807
Show file tree
Hide file tree
Showing 29 changed files with 260 additions and 152 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

Describes notable changes.

#### 1.24.0 - 2021/02/18
- Core and tasks triggering system does not depend on Spring Kafka, nor it's configuration.
Services have to now specify `tw-tasks.core.triggering.kafka.bootstrap-servers` parameter.
- Tasks triggering system has its own ObjectMapper instance.

#### 1.23.0 - 2021/02/17
- Node's tasks are resumed on startup by the same logic we resume other stuck tasks.
On the startup, the current node tasks in `PROCESSING` state will be marked to `ERROR` now.
Expand Down
9 changes: 5 additions & 4 deletions build.common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
compileOnly platform(libraries.springBootDependencies)

annotationProcessor libraries.lombok
annotationProcessor libraries.springBootConfigurationProcessor

compileOnly libraries.lombok
compileOnly libraries.spotbugsAnnotations
Expand Down Expand Up @@ -92,16 +93,16 @@ compileJava {
}

compileTestJava {
sourceCompatibility = JavaVersion.VERSION_13
targetCompatibility = JavaVersion.VERSION_13
sourceCompatibility = JavaVersion.VERSION_15
targetCompatibility = JavaVersion.VERSION_15
javaCompiler = javaToolchains.compilerFor {
languageVersion = JavaLanguageVersion.of(13)
languageVersion = JavaLanguageVersion.of(15)
}
}

test {
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.of(13)
languageVersion = JavaLanguageVersion.of(15)
}

testLogging {
Expand Down
116 changes: 59 additions & 57 deletions build.libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,64 +6,66 @@ ext {

libraries = [
// version defined
springBootDependencies : "org.springframework.boot:spring-boot-dependencies:${springBootVersion}",
apacheCuratorRecipies : "org.apache.curator:curator-recipes:5.1.0",
apacheCommonsCollections : "org.apache.commons:commons-collections4:4.4",
commonsIo : "commons-io:commons-io:2.8.0",
guava : "com.google.guava:guava:30.1-jre",
semver4j : "com.vdurmont:semver4j:3.1.0",
twGracefulShutdown : "com.transferwise.common:tw-graceful-shutdown:2.1.0",
twGracefulShutdownIntefaces : "com.transferwise.common:tw-graceful-shutdown-interfaces:1.3.2",
twContext : "com.transferwise.common:tw-context:${twContextVersion}",
twContextStarter : "com.transferwise.common:tw-context-starter:${twContextVersion}",
twIncidents : 'com.transferwise.common:tw-incidents:1.0.1',
twLeaderSelector : "com.transferwise.common:tw-leader-selector:${twLeaderSelectorVersion}",
twLeaderSelectorStarter : "com.transferwise.common:tw-leader-selector-starter:${twLeaderSelectorVersion}",
twBaseUtils : "com.transferwise.common:tw-base-utils:1.3.2",
spotbugsAnnotations : "com.github.spotbugs:spotbugs-annotations:4.2.1",
newRelic : "com.newrelic.agent.java:newrelic-api:6.4.1",
awaitility : 'org.awaitility:awaitility:4.0.3',
gafferJta : 'com.transferwise.common:tw-gaffer-jta:1.3.1',
twEntryPointsStarter : 'com.transferwise.common:tw-entrypoints-starter:2.3.0',
springBootDependencies : "org.springframework.boot:spring-boot-dependencies:${springBootVersion}",
apacheCuratorRecipies : "org.apache.curator:curator-recipes:5.1.0",
apacheCommonsCollections : "org.apache.commons:commons-collections4:4.4",
commonsIo : "commons-io:commons-io:2.8.0",
guava : "com.google.guava:guava:30.1-jre",
semver4j : "com.vdurmont:semver4j:3.1.0",
twGracefulShutdown : "com.transferwise.common:tw-graceful-shutdown:2.1.0",
twGracefulShutdownIntefaces : "com.transferwise.common:tw-graceful-shutdown-interfaces:1.3.2",
twContext : "com.transferwise.common:tw-context:${twContextVersion}",
twContextStarter : "com.transferwise.common:tw-context-starter:${twContextVersion}",
twIncidents : 'com.transferwise.common:tw-incidents:1.0.1',
twLeaderSelector : "com.transferwise.common:tw-leader-selector:${twLeaderSelectorVersion}",
twLeaderSelectorStarter : "com.transferwise.common:tw-leader-selector-starter:${twLeaderSelectorVersion}",
twBaseUtils : "com.transferwise.common:tw-base-utils:1.3.2",
spotbugsAnnotations : "com.github.spotbugs:spotbugs-annotations:4.2.1",
newRelic : "com.newrelic.agent.java:newrelic-api:6.4.1",
awaitility : 'org.awaitility:awaitility:4.0.3',
gafferJta : 'com.transferwise.common:tw-gaffer-jta:1.3.1',
twEntryPointsStarter : 'com.transferwise.common:tw-entrypoints-starter:2.3.0',

// versions managed by spring-boot-dependencies platform
springKafka : "org.springframework.kafka:spring-kafka",
springTx : "org.springframework:spring-tx",
springJdbc : "org.springframework:spring-jdbc",
springBeans : "org.springframework:spring-beans",
springCore : "org.springframework:spring-core",
springContext : "org.springframework:spring-context",
springWeb : "org.springframework:spring-web",
springSecurityWeb : "org.springframework.security:spring-security-web",
springBoot : "org.springframework.boot:spring-boot",
springBootStarterActuator : "org.springframework.boot:spring-boot-starter-actuator",
springBootStarterValidator : 'org.springframework.boot:spring-boot-starter-validation',
springBootStarter : "org.springframework.boot:spring-boot-starter",
springBootAutoconfigure : "org.springframework.boot:spring-boot-autoconfigure",
springBootStarterTest : 'org.springframework.boot:spring-boot-starter-test',
springBootStarterJdbc : 'org.springframework.boot:spring-boot-starter-jdbc',
springBootStarterWeb : 'org.springframework.boot:spring-boot-starter-web',
springSecurityConfig : 'org.springframework.security:spring-security-config',
springSecurityTest : 'org.springframework.security:spring-security-test',
apacheCommonsLang : "org.apache.commons:commons-lang3",
micrometerCore : "io.micrometer:micrometer-core",
micrometerRegistryPrometheus: "io.micrometer:micrometer-registry-prometheus",
jacksonAnnotations : "com.fasterxml.jackson.core:jackson-annotations",
jacksonDatabind : "com.fasterxml.jackson.core:jackson-databind",
javaxAnnotationApi : "javax.annotation:javax.annotation-api",
jakartaValidationApi : "jakarta.validation:jakarta.validation-api",
lombok : "org.projectlombok:lombok",
slf4j : "org.slf4j:slf4j-api",
junitApi : 'org.junit.jupiter:junit-jupiter-api',
junitEngine : 'org.junit.jupiter:junit-jupiter-engine',
junitParams : 'org.junit.jupiter:junit-jupiter-params',
junitMockito : 'org.mockito:mockito-junit-jupiter',
assertJCore : 'org.assertj:assertj-core',
mariadbJavaClient : 'org.mariadb.jdbc:mariadb-java-client',
liquibaseCore : 'org.liquibase:liquibase-core',
tomcatJdbc : 'org.apache.tomcat:tomcat-jdbc',
janino : 'org.codehaus.janino:janino',
postgresql : 'org.postgresql:postgresql',
mysqlConnectorJava : 'mysql:mysql-connector-java',
springKafka : "org.springframework.kafka:spring-kafka",
springTx : "org.springframework:spring-tx",
springJdbc : "org.springframework:spring-jdbc",
springBeans : "org.springframework:spring-beans",
springCore : "org.springframework:spring-core",
springContext : "org.springframework:spring-context",
springBootConfigurationProcessor: 'org.springframework.boot:spring-boot-configuration-processor',
springWeb : "org.springframework:spring-web",
springSecurityWeb : "org.springframework.security:spring-security-web",
springBoot : "org.springframework.boot:spring-boot",
springBootStarterActuator : "org.springframework.boot:spring-boot-starter-actuator",
springBootStarterValidator : 'org.springframework.boot:spring-boot-starter-validation',
springBootStarter : "org.springframework.boot:spring-boot-starter",
springBootAutoconfigure : "org.springframework.boot:spring-boot-autoconfigure",
springBootStarterTest : 'org.springframework.boot:spring-boot-starter-test',
springBootStarterJdbc : 'org.springframework.boot:spring-boot-starter-jdbc',
springBootStarterWeb : 'org.springframework.boot:spring-boot-starter-web',
springSecurityConfig : 'org.springframework.security:spring-security-config',
springSecurityTest : 'org.springframework.security:spring-security-test',
apacheCommonsLang : "org.apache.commons:commons-lang3",
micrometerCore : "io.micrometer:micrometer-core",
micrometerRegistryPrometheus : "io.micrometer:micrometer-registry-prometheus",
jacksonAnnotations : "com.fasterxml.jackson.core:jackson-annotations",
jacksonDatabind : "com.fasterxml.jackson.core:jackson-databind",
javaxAnnotationApi : "javax.annotation:javax.annotation-api",
jakartaValidationApi : "jakarta.validation:jakarta.validation-api",
kafkaClients : "org.apache.kafka:kafka-clients",
lombok : "org.projectlombok:lombok",
slf4j : "org.slf4j:slf4j-api",
junitApi : 'org.junit.jupiter:junit-jupiter-api',
junitEngine : 'org.junit.jupiter:junit-jupiter-engine',
junitParams : 'org.junit.jupiter:junit-jupiter-params',
junitMockito : 'org.mockito:mockito-junit-jupiter',
assertJCore : 'org.assertj:assertj-core',
mariadbJavaClient : 'org.mariadb.jdbc:mariadb-java-client',
liquibaseCore : 'org.liquibase:liquibase-core',
tomcatJdbc : 'org.apache.tomcat:tomcat-jdbc',
janino : 'org.codehaus.janino:janino',
postgresql : 'org.postgresql:postgresql',
mysqlConnectorJava : 'mysql:mysql-connector-java',
]
}
3 changes: 3 additions & 0 deletions demoapp/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ tw-tasks:
- emails
environment:
previousVersion: "1.21.1"
triggering:
kafka:
bootstrap-servers: localhost:9092
#trigger-same-task-in-all-nodes: true
#tasksHistoryDeletingBatchSize: 500
#tasksCleaningInterval: PT0.5s
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.23.0
version=1.24.0
org.gradle.internal.http.socketTimeout=120000
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,13 @@ void cleanupBaseTest(TestInfo testInfo) {
log.info("Cleaning up for '{}' It took {} ms", name, System.currentTimeMillis() - startTimeMs)
);
}

protected void cleanWithoutException(Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
log.error(t.getMessage(), t);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.transferwise.tasks.management.dao.IManagementTaskDao.DaoTask1;
import com.transferwise.tasks.management.dao.IManagementTaskDao.DaoTask2;
import com.transferwise.tasks.management.dao.IManagementTaskDao.DaoTask3;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.transferwise.tasks.ext.management.dao;

public class MySqlManagementTaskDaoIntTest extends ManagementTaskDaoIntTest {

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

@ActiveProfiles("postgres")
public class PostgresManagementTaskDaoIntTest extends ManagementTaskDaoIntTest {

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.config.TwTasksKafkaConfiguration;
import com.transferwise.tasks.helpers.kafka.ITopicPartitionsManager;
import com.transferwise.tasks.helpers.kafka.NoOpTopicPartitionsManager;
import java.lang.management.ManagementFactory;
Expand All @@ -20,13 +19,11 @@ public class TopicPartitionsManagerIntTest extends BaseIntTest {

@Autowired
private ITopicPartitionsManager noOpTopicPartitionsManager;
@Autowired
private TwTasksKafkaConfiguration kafkaConfiguration;

@Test
public void testAdminClientMbeanId() throws MalformedObjectNameException {
assertTrue(noOpTopicPartitionsManager instanceof NoOpTopicPartitionsManager);
try (AdminClient adminClient = noOpTopicPartitionsManager.createKafkaAdminClient(kafkaConfiguration.getKafkaProperties())) {
try (AdminClient ignored = noOpTopicPartitionsManager.createKafkaAdminClient()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
String objectName = "kafka.admin.client:type=app-info,id=adminclient-*";
Set<ObjectInstance> objectInstances = mbeanServer.queryMBeans(ObjectNameManager.getInstance(objectName), null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.transferwise.tasks.test.dao;

public class MySqlTestTaskDaoIntTest extends TestTaskDaoIntTest {

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

@ActiveProfiles("postgres")
public class PostgresTestTaskDaoIntTest extends TestTaskDaoIntTest {

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.transferwise.tasks.domain.FullTaskRecord;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskStatus;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import com.transferwise.common.context.UnitOfWorkManager;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.config.TwTasksKafkaConfiguration;
import com.transferwise.tasks.helpers.kafka.ConsistentKafkaConsumer;
import com.transferwise.tasks.helpers.kafka.configuration.TwTasksKafkaConfiguration;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.buckets.BucketProperties;
import com.transferwise.tasks.buckets.IBucketsManager;
import com.transferwise.tasks.config.TwTasksKafkaConfiguration;
import com.transferwise.tasks.dao.ITaskDaoDataSerializer;
import com.transferwise.tasks.domain.ITask;
import com.transferwise.tasks.ext.kafkalistener.KafkaListenerExtTestConfiguration;
import com.transferwise.tasks.helpers.kafka.configuration.TwTasksKafkaConfiguration;
import com.transferwise.tasks.helpers.kafka.messagetotask.IKafkaMessageHandler;
import com.transferwise.tasks.impl.jobs.interfaces.IJob;
import com.transferwise.tasks.impl.jobs.test.JobsTestConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import org.assertj.core.data.TemporalUnitWithinOffset;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -163,7 +165,7 @@ void settingToBeRetriedSetsTheCorrectRetryTime() {
assertEquals(1, task.getVersion());
assertEquals(0, task.getProcessingTriesCount());
assertNotNull(task.getStateTime());
assertEquals(retryTime.toInstant(), task.getNextEventTime().toInstant());
assertThat(retryTime).isCloseTo(task.getNextEventTime(), new TemporalUnitWithinOffset(1, ChronoUnit.MICROS));
}

@Test
Expand All @@ -181,7 +183,7 @@ void grabbingForProcessingIncrementsTheProcessingTriesCount() {
assertEquals(1, returnedTask.getProcessingTriesCount());

FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertEquals(processingDeadline, fullTaskRecord.getNextEventTime().toInstant());
assertThat(processingDeadline).isCloseTo(fullTaskRecord.getNextEventTime().toInstant(), new TemporalUnitWithinOffset(1, ChronoUnit.MICROS));
assertEquals(nodeId, fullTaskRecord.getProcessingClientId());
}

Expand All @@ -202,7 +204,7 @@ void settingToBeRetriedResetTriesCountIfSpecified() {
assertEquals(2, finalTask.getVersion());
assertEquals(0, finalTask.getProcessingTriesCount());
assertNotNull(finalTask.getStateTime());
assertEquals(retryTime.toInstant(), finalTask.getNextEventTime().toInstant());
assertThat(finalTask.getNextEventTime()).isCloseTo(retryTime.toString(), new TemporalUnitWithinOffset(1, ChronoUnit.MICROS));
}

@Test
Expand Down Expand Up @@ -271,7 +273,7 @@ void markAsSubmittedAndSetNextEventTimePutsTheTaskInSubmittedStateAndUpdatesNext
FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertEquals("SUBMITTED", fullTaskRecord.getStatus());
assertEquals(1, fullTaskRecord.getVersion());
assertEquals(maxStuckTime.toInstant(), fullTaskRecord.getNextEventTime().toInstant());
assertThat(maxStuckTime).isCloseTo(fullTaskRecord.getNextEventTime(), new TemporalUnitWithinOffset(1, ChronoUnit.MICROS));
}

@Test
Expand All @@ -291,7 +293,7 @@ void preparingStuckOnProcessingTasksForResumingPutsTheTaskInSubmittedState() {

FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertEquals("SUBMITTED", fullTaskRecord.getStatus());
assertEquals(processingDeadline, fullTaskRecord.getNextEventTime().toInstant());
assertThat(processingDeadline).isCloseTo(fullTaskRecord.getNextEventTime().toInstant(), new TemporalUnitWithinOffset(1, ChronoUnit.MICROS));
}

@Test
Expand All @@ -304,7 +306,7 @@ void markingAsSubmittedUpdatesTheStateCorrectly() {
assertTrue(result);
FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertEquals("SUBMITTED", fullTaskRecord.getStatus());
assertEquals(maxStuckTime.toInstant(), fullTaskRecord.getNextEventTime().toInstant());
assertThat(maxStuckTime.toInstant()).isCloseTo(fullTaskRecord.getNextEventTime().toInstant(), new TemporalUnitWithinOffset(1, ChronoUnit.MICROS));
}

@Test
Expand Down Expand Up @@ -504,6 +506,7 @@ public void testUuids() {
clock.tick(Duration.ofMillis(2));
}
List<Task> tasks = testTaskDao.findTasksByTypeSubTypeAndStatus(taskType, null, TaskStatus.DONE);
tasks.sort(Comparator.comparing(Task::getId));

List<Integer> resultInts = tasks.stream().map(t -> Integer.parseInt(new String(t.getData(), StandardCharsets.UTF_8)))
.collect(Collectors.toList());
Expand Down
Loading

0 comments on commit a8e7807

Please sign in to comment.