Skip to content

Commit

Permalink
Corrections for MDC and entry points handling. (#67)
Browse files Browse the repository at this point in the history
* Corrections for MDC and entry points handling.
  • Loading branch information
onukristo authored Nov 2, 2020
1 parent 2afc5e0 commit 410d81c
Show file tree
Hide file tree
Showing 57 changed files with 1,425 additions and 863 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,32 @@

Describes notable changes.

#### 1.18.0 - 2020/11/01
- MDC corrections.
Following MDC keys are now set for tasks under processing:
* `twTaskId`
* `twTaskVersion`
* `twTaskType`
* `twTaskSubType`

`twTaskVersionId` is not set anymore.

- Task can now define its TwContext criticality and owner.

- Lots of corrections around entry points creation.

#### 1.17.0 - 2020/10/31
- Optimization and configuration for fetching approximate tasks and unique keys count by cluster wide tasks state monitor.
Consult with `com.transferwise.tasks.TasksProperties.ClusterWideTasksStateMonitor` for added configuration options.

- Minor external libraries upgrades.
- Minor testsuite optimizations.

Some transactions are now using isolation level READ_UNCOMMITTED. If you are using JTA transaction manager, you may have to do
two things.
1. Wrap your datasource into `org.springframework.jdbc.datasource.IsolationLevelDataSourceAdapter`
2. Set `org.springframework.transaction.jta.JtaTransactionManager.setAllowCustomIsolationLevels` to true.

#### 1.16.0 - 2020/10/28
Use separate DAOs for Core/Test/Management.
- ITaskDao - data access operations used by the core and extensions.
Expand Down
15 changes: 1 addition & 14 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
1. Investigate the feasibility of completely moving from ZonedDateTime to Instant.

2. Go open source.

3. Some Strings are repeated. For example Task's type, bucket's id, Kafka topics.
In order to conserve memory or allow bigger buffers, we could String.intern() them as soon as they are loaded into memory.
Needs performance testing.
Expand All @@ -16,15 +14,6 @@ It can be especially useful for test environments.
to specify the concurrency N/nodeCount. Also test how fast it is, because current task triggering table algorithm assumes,
that booking a processing slot is very fast (mostly based on atomic numbers).

7. Refactor everything into smaller layers/components. This is a precondition for going open source.
Layers and components could look like following (lowest to highest)

- Commons and Utils
- Test Helpers
- Zookeeper Helpers, Kafka Helpers, MetricsHelper
- Tw Tasks Engine, Tw Tasks Test Helpers (TestTasksService)
- ToKafka messages, FromKafka messages, Periodic Tasks (Cron)

8. Rename buckets and processing buckets to shards.
Much better understandable term. Not many people are aware that in hashmap (where the term bucket is coming from), the shards are called buckets.

Expand All @@ -44,10 +33,8 @@ and in the end it may not make things cleaner.
17. Rename property `tw-tasks.zookeeper-connect-string` to `tw-tasks.kafka.zookeeper.connect-string`, because it is only used for Kafka
topics configurations. Could refactor the properties to more hierarhical structure.

18. Move everything to taskVersionId ,instead of just separate id and version. Also always log both out and set them both in MDC.
18. Move everything to taskVersionId ,instead of just separate id and version.

20. Add metric for how long it takes a task from adding to processing. Or scheduling time to processing.

23. Start using Avro or other binary messages for triggering queue. This Json crap is expensive?

24. Try to remove id,version index and see if there is any perf hit at all.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ buildscript {
plugins {
id "com.github.spotbugs" version "4.5.1" apply false
id "idea"
id 'org.springframework.boot' version '2.3.4.RELEASE' apply false
id 'org.springframework.boot' version '2.3.5.RELEASE' apply false
}

idea.project {
Expand Down
4 changes: 2 additions & 2 deletions build.libraries.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ext {
twContextVersion = "0.6.1"
twContextVersion = "0.7.0"
twLeaderSelectorVersion = "1.4.0"
springBootVersion = "2.3.4.RELEASE"
springBootVersion = "2.3.5.RELEASE"


libraries = [
Expand Down
1 change: 1 addition & 0 deletions demoapp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation libraries.twBaseUtils
implementation libraries.twGracefulShutdown
implementation libraries.twGracefulShutdownIntefaces
implementation libraries.twContext

compileOnly libraries.newRelic

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.datasource.IsolationLevelDataSourceAdapter;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

Expand All @@ -38,6 +39,7 @@ public TransactionManager gafferTransactionManager() {
public JtaTransactionManager transactionManager() {
ServiceRegistry serviceRegistry = ServiceRegistryHolder.getServiceRegistry();
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(gafferUserTransaction(), gafferTransactionManager());
jtaTransactionManager.setAllowCustomIsolationLevels(true);
jtaTransactionManager.setTransactionSynchronizationRegistry(serviceRegistry.getTransactionSynchronizationRegistry());
return jtaTransactionManager;
}
Expand All @@ -51,7 +53,9 @@ public DataSource jtaDataSource() {
dataSourceImpl.setValidationTimeoutSeconds(10);
dataSourceImpl.setRegisterAsMBean(false);

return dataSourceImpl;
IsolationLevelDataSourceAdapter da = new IsolationLevelDataSourceAdapter();
da.setTargetDataSource(dataSourceImpl);
return da;
}

private DataSource dataSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.IsolationLevelDataSourceAdapter;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

Expand All @@ -38,6 +39,7 @@ public JtaTransactionManager transactionManager() {
ServiceRegistry serviceRegistry = ServiceRegistryHolder.getServiceRegistry();
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(gafferUserTransaction(), gafferTransactionManager());
jtaTransactionManager.setTransactionSynchronizationRegistry(serviceRegistry.getTransactionSynchronizationRegistry());
jtaTransactionManager.setAllowCustomIsolationLevels(true);
return jtaTransactionManager;
}

Expand Down Expand Up @@ -67,7 +69,9 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
dataSourceImpl.setRegisterAsMBean(false);
dataSourceImpl.init();

return dataSourceImpl;
IsolationLevelDataSourceAdapter da = new IsolationLevelDataSourceAdapter();
da.setTargetDataSource(dataSourceImpl);
return da;
}
return bean;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.common.context.UnitOfWorkManager;
import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy;
import com.transferwise.tasks.ITasksService;
import com.transferwise.tasks.demoapp.payout.PayoutInstruction;
Expand Down Expand Up @@ -36,6 +37,8 @@ public class CoreKafkaListener implements GracefulShutdownStrategy {
private IMeterHelper meterHelper;
@Autowired
private IErrorLoggingThrottler errorLoggingThrottler;
@Autowired
private UnitOfWorkManager unitOfWorkManager;

private ExecutorService executorService;

Expand Down Expand Up @@ -75,6 +78,7 @@ public void poll() {
})
.setMeterHelper(meterHelper)
.setErrorLoggingThrottler(errorLoggingThrottler)
.setUnitOfWorkManager(unitOfWorkManager)
.consume();
}

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.17.0
version=1.18.0
org.gradle.internal.http.socketTimeout=120000
6 changes: 0 additions & 6 deletions integration-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ if (!Boolean.parseBoolean(System.getenv('CI'))) {
dockerCompose.isRequiredBy(test)
}

ServerSocket ss = new ServerSocket(0)
def freePort = ss.getLocalPort()
ss.close()

dockerCompose {
useComposeFiles = ["src/test/resources/docker-compose.yml"]
// Create some flakiness on slower comps
Expand All @@ -58,6 +54,4 @@ dockerCompose {
// Set to true if you have anomalies
stopContainers = true
removeContainers = true

environment.put "KAFKA_RANDOM_PORT", "${freePort}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootContextLoader;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.web.servlet.MockMvc;

@ActiveProfiles(profiles = {"test", "mysql"}, resolver = SystemPropertyActiveProfilesResolver.class)
@SpringBootTest(classes = {TestConfiguration.class, TestApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ContextConfiguration(loader = SpringBootContextLoader.class)
@Slf4j
@AutoConfigureMockMvc
public abstract class BaseIntTest {

static {
Expand All @@ -53,9 +50,6 @@ void setApplicationContext(ApplicationContext applicationContext) {
TestApplicationContextHolder.setApplicationContext(applicationContext);
}

@Autowired
protected MockMvc mockMvc; // use for testing secured endpoints

private long startTimeMs = System.currentTimeMillis();

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ void allWorks() throws Exception {

/**
* Test is more suitable for testing any kind of database limits.
*
*
* <p>So far best results have been observed for 1mln tasks: 280s postgres, 450s mariadb.
*
*
* <p>Postgres has much more favourable architecture for tw-task in general.
*/
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.transferwise.common.context.UnitOfWorkManager;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.config.TwTasksKafkaConfiguration;
import com.transferwise.tasks.helpers.kafka.ConsistentKafkaConsumer;
Expand All @@ -23,6 +24,8 @@ class ConsistentKafkaConsumerIntTest extends BaseIntTest {
private IToKafkaTestHelper toKafkaTestHelper;
@Autowired
private TwTasksKafkaConfiguration kafkaConfiguration;
@Autowired
private UnitOfWorkManager unitOfWorkManager;

@Test
void allMessagesWillBeReceivedOnceOnRebalancing() throws Exception {
Expand All @@ -43,6 +46,7 @@ void allMessagesWillBeReceivedOnceOnRebalancing() throws Exception {
.setTopics(Collections.singletonList(testTopic))
.setShouldFinishPredicate(shouldFinish::get)
.setShouldPollPredicate(() -> !shouldFinish.get())
.setUnitOfWorkManager(unitOfWorkManager)
.setRecordConsumer(consumerRecord -> log
.info("Received message '{}': {}.", consumerRecord.value(), messagesReceivedCounts.get(consumerRecord.value()).incrementAndGet()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.common.context.UnitOfWorkManager;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.config.TwTasksKafkaConfiguration;
import com.transferwise.tasks.helpers.kafka.ConsistentKafkaConsumer;
Expand All @@ -31,6 +32,8 @@ class KafkaIntTest extends BaseIntTest {
private IToKafkaSenderService toKafkaSenderService;
@Autowired
private ITransactionsHelper transactionsHelper;
@Autowired
private UnitOfWorkManager unitOfWorkManager;

private final Duration delayTimeout = Duration.ofMillis(5);

Expand Down Expand Up @@ -68,6 +71,7 @@ void sendingAMessageToKafkaWorks() {
messagesReceivedCount.incrementAndGet();
}
})
.setUnitOfWorkManager(unitOfWorkManager)
.consume();

assertEquals(1, messagesReceivedCount.get());
Expand Down Expand Up @@ -114,6 +118,7 @@ void sendingBatchMessagesToKafkaWorks(int iteration) {
messagesMap.get(record.value()).incrementAndGet();
}
})
.setUnitOfWorkManager(unitOfWorkManager)
.consume();

await().until(() ->
Expand Down Expand Up @@ -167,6 +172,7 @@ void sendingBatchMessagesToKafkaWorksWith5Partitions(int iteration) {
}
partitionsMap.computeIfAbsent(record.partition(), k -> new AtomicInteger()).incrementAndGet();
})
.setUnitOfWorkManager(unitOfWorkManager)
.consume();

await().until(() ->
Expand Down Expand Up @@ -219,6 +225,7 @@ void flakyMessagesAccepterWillNotStopTheProcessing(int iteration) {
messagesMap.get(record.value()).incrementAndGet();
}
})
.setUnitOfWorkManager(unitOfWorkManager)
.consume();

await().until(() ->
Expand Down
Loading

0 comments on commit 410d81c

Please sign in to comment.