From edf8e47bc8eac22e3657e3b066446dc6240907fe Mon Sep 17 00:00:00 2001 From: Chris Richardson Date: Wed, 3 Aug 2022 17:12:16 +0900 Subject: [PATCH] #128 Enhance PollingDao to use a thread-per-channel Also, removed unused bean dependencies in order to simplify testing --- build.gradle | 16 +- docker-compose-cdc-mysql-polling.yml | 30 +++ docker-compose-cdc-unified-polling-mysql.yml | 30 +++ .../local/common/BinlogEntryHandler.java | 4 + .../local/common/BinlogEntryReader.java | 3 - .../common/BinlogEntryToEventConverter.java | 2 + .../BinlogEntryToPublishedEventConverter.java | 5 + .../EventuateConfigurationProperties.java | 10 +- .../local/db/log/common/DbLogClient.java | 9 +- .../polling/ParallelPollingChannels.java | 41 +++++ .../eventuate/local/polling/PollingDao.java | 86 ++++++--- .../local/polling/spec/PollingSpec.java | 18 ++ .../spec/PollingSpecExcludingChannels.java | 24 +++ .../local/polling/spec/PollingSpecForAll.java | 17 ++ .../polling/spec/PollingSpecForChannel.java | 21 +++ .../local/polling/spec/SqlFragment.java | 23 +++ .../AbstractPollingDaoIntegrationTest.java | 143 +++++++++++++++ .../ParallelPollingDaoIntegrationTest.java | 52 ++++++ ...ryReaderMessageTableTestConfiguration.java | 2 +- .../polling/PollingDaoIntegrationTest.java | 172 ++++-------------- .../PollingIntegrationTestConfiguration.java | 2 +- .../local/postgres/wal/PostgresWalClient.java | 9 +- .../PipelineConfigPropertiesProvider.java | 98 ++++++++++ .../cdc/pipeline/UnifiedCdcConfigurator.java | 47 ++--- .../common/BinlogEntryReaderProvider.java | 3 +- .../CommonCdcPipelineReaderFactory.java | 11 -- ...lBinlogCdcPipelineReaderConfiguration.java | 10 - .../MySqlBinlogCdcPipelineReaderFactory.java | 6 - ...gresWalCdcPipelineReaderConfiguration.java | 10 - .../PostgresWalCdcPipelineReaderFactory.java | 6 - ...PollingCdcPipelineReaderConfiguration.java | 15 +- .../PollingCdcPipelineReaderFactory.java | 9 +- .../PollingPipelineReaderProperties.java | 20 ++ .../PipelineConfigPropertiesProviderTest.java | 79 ++++++++ .../factory/DataSourceCreationTest.java | 2 +- ...ipelineReaderPropertiesConversionTest.java | 26 +++ ...gPipelineReaderPropertyValidationTest.java | 7 +- .../sample-pipeline-config.properties | 14 ++ .../BinlogEntryToMessageConverter.java | 5 + scripts/mysql-cli8.sh | 10 + 40 files changed, 828 insertions(+), 269 deletions(-) create mode 100644 docker-compose-cdc-mysql-polling.yml create mode 100644 docker-compose-cdc-unified-polling-mysql.yml create mode 100644 eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/ParallelPollingChannels.java create mode 100644 eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpec.java create mode 100644 eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecExcludingChannels.java create mode 100644 eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecForAll.java create mode 100644 eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecForChannel.java create mode 100644 eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/SqlFragment.java create mode 100644 eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/AbstractPollingDaoIntegrationTest.java create mode 100644 eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/ParallelPollingDaoIntegrationTest.java create mode 100644 eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProvider.java create mode 100644 eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProviderTest.java create mode 100644 eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/polling/PollingPipelineReaderPropertiesConversionTest.java create mode 100644 eventuate-local-java-cdc-connector-unified/src/test/resources/sample-pipeline-config.properties create mode 100755 scripts/mysql-cli8.sh diff --git a/build.gradle b/build.gradle index b4c448fe..e37acc0a 100644 --- a/build.gradle +++ b/build.gradle @@ -104,6 +104,20 @@ dockerCompose { removeContainers = project.ext.removeContainers } + mysql8cdcpolling { + projectName = null + useComposeFiles = ["docker-compose-mysql8.yml", "docker-compose-cdc-mysql-polling.yml"] + removeContainers = project.ext.removeContainers + startedServices = ["eventuate-cdc-service"] + } + + mysql8cdcunifiedpolling { + projectName = null + useComposeFiles = ["docker-compose-mysql8.yml", "docker-compose-cdc-unified-polling-mysql.yml"] + removeContainers = project.ext.removeContainers + startedServices = ["eventuate-cdc-service"] + } + mysqlonly { projectName = null useComposeFiles = ["docker-compose-mysql.yml"] @@ -116,7 +130,7 @@ dockerCompose { projectName = null useComposeFiles = ["docker-compose-mysql8.yml"] removeContainers = project.ext.removeContainers - startedServices = ["mysql8"] + startedServices = ["mysql"] removeOrphans = false } diff --git a/docker-compose-cdc-mysql-polling.yml b/docker-compose-cdc-mysql-polling.yml new file mode 100644 index 00000000..876d496f --- /dev/null +++ b/docker-compose-cdc-mysql-polling.yml @@ -0,0 +1,30 @@ +version: '3' +services: + eventuate-cdc-service: + env_file: + - ${DOCKER_ENV_FILE:-docker-compose-env-files/empty.env} + build: + context: ./eventuate-cdc-service/ + image: eventuateio/eventuate-cdc-service:${DOCKER_IMAGE_TAG:-latest} + ports: + - "8099:8080" + links: + - mysql + - kafka + - zookeeper + environment: + SPRING_DATASOURCE_URL: jdbc:mysql://mysql/eventuate + SPRING_DATASOURCE_USERNAME: mysqluser + SPRING_DATASOURCE_PASSWORD: mysqlpw + SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.mysql.cj.jdbc.Driver + EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181 + EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:29092 + EVENTUATELOCAL_CDC_DB_USER_NAME: root + EVENTUATELOCAL_CDC_DB_PASSWORD: rootpassword + EVENTUATELOCAL_CDC_READER_NAME: MySqlReader + EVENTUATE_OUTBOX_ID: 1 + EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_UNIQUE_ID: 1234567890 + EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC: "false" + EVENTUATE_CDC_TYPE: ${EVENTUATE_CDC_TYPE} + SPRING_PROFILES_ACTIVE: EventuatePolling + EVENTUATELOCAL_CDC_POLLING_PARALLEL_CHANNELS: parallel_channel_1 diff --git a/docker-compose-cdc-unified-polling-mysql.yml b/docker-compose-cdc-unified-polling-mysql.yml new file mode 100644 index 00000000..6273ab95 --- /dev/null +++ b/docker-compose-cdc-unified-polling-mysql.yml @@ -0,0 +1,30 @@ +version: '3' +services: + eventuate-cdc-service: + build: ./eventuate-cdc-service + ports: + - "8099:8080" + links: + - mysql + - kafka + - zookeeper + environment: + EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:29092 + EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181 + SPRING_PROFILES_ACTIVE: ${SPRING_PROFILES_ACTIVE} + + EVENTUATE_CDC_READER_MYSQLREADER_TYPE: polling + EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEURL: jdbc:mysql://mysql:3306/eventuate + EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEUSERNAME: mysqluser + EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEPASSWORD: mysqlpw + EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEDRIVERCLASSNAME: com.mysql.cj.jdbc.Driver + EVENTUATE_CDC_READER_MYSQLREADER_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/1 + EVENTUATE_CDC_READER_MYSQLREADER_OUTBOXID: 1 + EVENTUATE_CDC_READER_MYSQLREADER_POLLINGPARALLELCHANNELNAMES: parallel_channel_1 + + EVENTUATE_CDC_PIPELINE_P1_TYPE: eventuate-local + EVENTUATE_CDC_PIPELINE_P1_READER: MYSQLREADER + + EVENTUATE_CDC_PIPELINE_P4_TYPE: eventuate-tram + EVENTUATE_CDC_PIPELINE_P4_READER: MYSQLREADER + diff --git a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryHandler.java b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryHandler.java index f84f2488..726c26aa 100644 --- a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryHandler.java +++ b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryHandler.java @@ -38,4 +38,8 @@ public CompletableFuture publish(BinlogEntry binlogEntry) { .map(eventPublisher::apply) .orElse(CompletableFuture.completedFuture(null)); } + + public String getDestinationColumn() { + return binlogEntryToEventConverter.getDestinationColumn(); + } } diff --git a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryReader.java b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryReader.java index ca792676..7c45ee0d 100644 --- a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryReader.java +++ b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryReader.java @@ -22,7 +22,6 @@ public abstract class BinlogEntryReader { protected List binlogEntryHandlers = new CopyOnWriteArrayList<>(); protected AtomicBoolean running = new AtomicBoolean(false); protected CountDownLatch stopCountDownLatch; - protected String dataSourceUrl; protected DataSource dataSource; protected String readerName; protected Long outboxId; @@ -34,13 +33,11 @@ public abstract class BinlogEntryReader { protected Optional restartCallback = Optional.empty(); public BinlogEntryReader(MeterRegistry meterRegistry, - String dataSourceUrl, DataSource dataSource, String readerName, Long outboxId) { this.meterRegistry = meterRegistry; - this.dataSourceUrl = dataSourceUrl; this.dataSource = dataSource; this.readerName = readerName; this.outboxId = outboxId; diff --git a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryToEventConverter.java b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryToEventConverter.java index 0e4bcd48..22fed1e3 100644 --- a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryToEventConverter.java +++ b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryToEventConverter.java @@ -4,4 +4,6 @@ public interface BinlogEntryToEventConverter { Optional convert(BinlogEntry binlogEntry); + + String getDestinationColumn(); } diff --git a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryToPublishedEventConverter.java b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryToPublishedEventConverter.java index 070eac4c..e1e47b53 100644 --- a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryToPublishedEventConverter.java +++ b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/BinlogEntryToPublishedEventConverter.java @@ -40,4 +40,9 @@ public Optional convert(BinlogEntry binlogEntry) { return Optional.of(publishedEvent); } + + @Override + public String getDestinationColumn() { + return "entity_type"; + } } \ No newline at end of file diff --git a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/EventuateConfigurationProperties.java b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/EventuateConfigurationProperties.java index 85ce482c..3f8a6c61 100644 --- a/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/EventuateConfigurationProperties.java +++ b/eventuate-local-java-cdc-connector-common/src/main/java/io/eventuate/local/common/EventuateConfigurationProperties.java @@ -2,8 +2,6 @@ import org.springframework.beans.factory.annotation.Value; -import java.util.concurrent.TimeUnit; - public class EventuateConfigurationProperties { @Value("${eventuatelocal.cdc.db.user.name:#{null}}") @@ -36,6 +34,10 @@ public class EventuateConfigurationProperties { @Value("${eventuatelocal.cdc.polling.retry.interval.in.milleseconds:#{500}}") private int pollingRetryIntervalInMilliseconds; + + @Value("${eventuatelocal.cdc.polling.parallel.channels:}") + private String[] pollingParallelChannels; + @Value("${eventuatelocal.cdc.leadership.lock.path:#{\"/eventuatelocal/cdc/leader\"}}") private String leadershipLockPath; @@ -207,4 +209,8 @@ public boolean isEnableBatchProcessing() { public int getMaxBatchSize() { return maxBatchSize; } + + public String[] getPollingParallelChannels() { + return pollingParallelChannels; + } } diff --git a/eventuate-local-java-cdc-connector-db-log-common/src/main/java/io/eventuate/local/db/log/common/DbLogClient.java b/eventuate-local-java-cdc-connector-db-log-common/src/main/java/io/eventuate/local/db/log/common/DbLogClient.java index aaa6a840..9cfa1619 100644 --- a/eventuate-local-java-cdc-connector-db-log-common/src/main/java/io/eventuate/local/db/log/common/DbLogClient.java +++ b/eventuate-local-java-cdc-connector-db-log-common/src/main/java/io/eventuate/local/db/log/common/DbLogClient.java @@ -4,7 +4,8 @@ import io.eventuate.common.jdbc.EventuateSchema; import io.eventuate.common.jdbc.JdbcUrl; import io.eventuate.common.jdbc.JdbcUrlParser; -import io.eventuate.local.common.*; +import io.eventuate.local.common.BinlogEntryReader; +import io.eventuate.local.common.CdcMonitoringDao; import io.micrometer.core.instrument.MeterRegistry; import javax.sql.DataSource; @@ -16,9 +17,9 @@ public abstract class DbLogClient extends BinlogEntryReader { protected String dbPassword; protected String host; protected int port; - protected String defaultDatabase; protected DbLogMetrics dbLogMetrics; - private boolean checkEntriesForDuplicates; + protected String dataSourceUrl; + private boolean checkEntriesForDuplicates; protected volatile boolean connected; protected CdcMonitoringDao cdcMonitoringDao; @@ -35,7 +36,6 @@ public DbLogClient(MeterRegistry meterRegistry, Long outboxId) { super(meterRegistry, - dataSourceUrl, dataSource, readerName, outboxId); @@ -57,7 +57,6 @@ public DbLogClient(MeterRegistry meterRegistry, JdbcUrl jdbcUrl = JdbcUrlParser.parse(dataSourceUrl); host = jdbcUrl.getHost(); port = jdbcUrl.getPort(); - defaultDatabase = jdbcUrl.getDatabase(); } public boolean isConnected() { diff --git a/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/ParallelPollingChannels.java b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/ParallelPollingChannels.java new file mode 100644 index 00000000..141c3587 --- /dev/null +++ b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/ParallelPollingChannels.java @@ -0,0 +1,41 @@ +package io.eventuate.local.polling; + +import io.eventuate.local.polling.spec.PollingSpec; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Collections.singletonList; + +public class ParallelPollingChannels { + + private Set channels; + + @Override + public String toString() { + return "ParallelPollingChannels{" + + "channels=" + channels + + '}'; + } + + public ParallelPollingChannels(Set channels) { + this.channels = channels; + } + + public static ParallelPollingChannels make(String[] channels) { + return new ParallelPollingChannels(new HashSet<>(Arrays.asList(channels))); + } + + public int size() { + return channels.size(); + } + + public List makePollingSpecs() { + return channels.isEmpty() ? singletonList(PollingSpec.ALL) : Stream.concat(Stream.of(PollingSpec.excludingChannels(channels)), channels.stream().map(PollingSpec::forChannel)).collect(Collectors.toList()); + } + +} diff --git a/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/PollingDao.java b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/PollingDao.java index 4eaf7a89..80aecd96 100644 --- a/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/PollingDao.java +++ b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/PollingDao.java @@ -1,28 +1,37 @@ package io.eventuate.local.polling; import com.google.common.collect.ImmutableMap; -import io.eventuate.common.spring.jdbc.EventuateSpringJdbcStatementExecutor; import io.eventuate.common.eventuate.local.BinLogEvent; import io.eventuate.common.eventuate.local.BinlogFileOffset; import io.eventuate.common.jdbc.EventuateJdbcStatementExecutor; import io.eventuate.common.jdbc.EventuateSchema; import io.eventuate.common.jdbc.SchemaAndTable; import io.eventuate.common.jdbc.sqldialect.EventuateSqlDialect; +import io.eventuate.common.spring.jdbc.EventuateSpringJdbcStatementExecutor; import io.eventuate.local.common.*; +import io.eventuate.local.polling.spec.PollingSpec; +import io.eventuate.local.polling.spec.SqlFragment; import io.micrometer.core.instrument.MeterRegistry; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.support.rowset.SqlRowSet; import javax.sql.DataSource; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; import java.util.stream.Collectors; public class PollingDao extends BinlogEntryReader { private static final String PUBLISHED_FIELD = "published"; + private final String dataSourceUrl; + private final ParallelPollingChannels pollingParallelChannels; private DataSource dataSource; private NamedParameterJdbcTemplate namedParameterJdbcTemplate; @@ -44,10 +53,10 @@ public PollingDao(MeterRegistry meterRegistry, int pollingIntervalInMilliseconds, String readerName, EventuateSqlDialect eventuateSqlDialect, - Long outboxId) { + Long outboxId, + ParallelPollingChannels pollingParallelChannels) { super(meterRegistry, - dataSourceUrl, dataSource, readerName, outboxId); @@ -56,6 +65,7 @@ public PollingDao(MeterRegistry meterRegistry, throw new IllegalArgumentException("Max events per polling parameter should be greater than 0."); } + this.dataSourceUrl = dataSourceUrl; this.dataSource = dataSource; this.pollingIntervalInMilliseconds = pollingIntervalInMilliseconds; this.namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource); @@ -64,8 +74,8 @@ public PollingDao(MeterRegistry meterRegistry, this.maxAttemptsForPolling = maxAttemptsForPolling; this.pollingRetryIntervalInMilliseconds = pollingRetryIntervalInMilliseconds; this.eventuateSqlDialect = eventuateSqlDialect; - pollingProcessingStatusService = new PollingProcessingStatusService(dataSource, PUBLISHED_FIELD, eventuateSqlDialect); + this.pollingParallelChannels = pollingParallelChannels; } @Override @@ -83,47 +93,69 @@ public BinlogEntryHandler addBinlogEntryHandler(Even return binlogEntryHandler; } + @Override public void start() { - logger.info("Starting PollingDao"); + logger.info("Starting {} {}", readerName, pollingParallelChannels); super.start(); - stopCountDownLatch = new CountDownLatch(1); + stopCountDownLatch = new CountDownLatch(1 + pollingParallelChannels.size()); running.set(true); - while (running.get()) { - int processedEvents = 0; - try { - processedEvents = binlogEntryHandlers.stream().map(this::processEvents).reduce(0, (a, b) -> a + b); - } catch (Exception e) { - handleProcessingFailException(e); - } + pollingParallelChannels.makePollingSpecs().forEach(this::startPollingThread); + + logger.info("startup completed {}", readerName); + } + + + private ExecutorService executor = Executors.newCachedThreadPool(); - try { - if (processedEvents == 0) { - Thread.sleep(pollingIntervalInMilliseconds); + public void startPollingThread(PollingSpec pollingSpec) { + logger.info("Starting polling thread for {}", pollingSpec); + executor.submit(() -> { + logger.info("Started polling thread for {}", pollingSpec); + while (running.get()) { + int processedEvents = 0; + try { + processedEvents = binlogEntryHandlers.stream().map(handler -> processEvents(handler, pollingSpec)).reduce(0, (a, b) -> a + b); + } catch (Exception e) { + handleProcessingFailException(e); } - } catch (InterruptedException e) { - handleProcessingFailException(e); - } - } - stopCountDownLatch.countDown(); - logger.info("PollingDao finished processing"); + try { + if (processedEvents == 0) { + Thread.sleep(pollingIntervalInMilliseconds); + } + } catch (InterruptedException e) { + handleProcessingFailException(e); + } + } + logger.info("Stopped polling thread for {}", pollingSpec); + stopCountDownLatch.countDown(); + }); } - public int processEvents(BinlogEntryHandler handler) { + public int processEvents(BinlogEntryHandler handler, PollingSpec pollingSpec) { String pk = getPrimaryKey(handler); - String findEventsQuery = eventuateSqlDialect.addLimitToSql(String.format("SELECT * FROM %s WHERE %s = 0 ORDER BY %s ASC", - handler.getQualifiedTable(), PUBLISHED_FIELD, pk), ":limit"); + SqlFragment sqlFragment = pollingSpec.addToWhere(handler.getDestinationColumn()); + + String findEventsQuery = eventuateSqlDialect.addLimitToSql(String.format("SELECT * FROM %s WHERE %s = 0 %s ORDER BY %s ASC", + handler.getQualifiedTable(), PUBLISHED_FIELD, sqlFragment.sql, pk), ":limit"); + + logger.debug("Polling with query {}", findEventsQuery); + + Map params = new HashMap<>(); + params.put("limit", maxEventsPerPolling); + params.putAll(sqlFragment.params); SqlRowSet sqlRowSet = DaoUtils.handleConnectionLost(maxAttemptsForPolling, pollingRetryIntervalInMilliseconds, - () -> namedParameterJdbcTemplate.queryForRowSet(findEventsQuery, ImmutableMap.of("limit", maxEventsPerPolling)), + () -> namedParameterJdbcTemplate.queryForRowSet(findEventsQuery, params), this::onInterrupted, running); + List> ids = new ArrayList<>(); while (sqlRowSet.next()) { diff --git a/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpec.java b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpec.java new file mode 100644 index 00000000..f6d32ca9 --- /dev/null +++ b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpec.java @@ -0,0 +1,18 @@ +package io.eventuate.local.polling.spec; + +import java.util.Set; + +public abstract class PollingSpec { + public static final PollingSpec ALL = new PollingSpecForAll(); + + public static PollingSpec excludingChannels(Set exclusions) { + return new PollingSpecExcludingChannels(exclusions); + } + + public static PollingSpec forChannel(String channel) { + return new PollingSpecForChannel(channel); + } + + public abstract SqlFragment addToWhere(String destination); + +} diff --git a/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecExcludingChannels.java b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecExcludingChannels.java new file mode 100644 index 00000000..cdc7ce11 --- /dev/null +++ b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecExcludingChannels.java @@ -0,0 +1,24 @@ +package io.eventuate.local.polling.spec; + +import java.util.Set; + +public class PollingSpecExcludingChannels extends PollingSpec { + + private final Set exclusions; + + @Override + public String toString() { + return "PollingSpecExcludingChannels{" + + "exclusions=" + exclusions + + '}'; + } + + public PollingSpecExcludingChannels(Set exclusions) { + this.exclusions = exclusions; + } + + @Override + public SqlFragment addToWhere(String destination) { + return SqlFragment.make("AND %s NOT IN (%s)", destination, "channels", exclusions); + } +} diff --git a/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecForAll.java b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecForAll.java new file mode 100644 index 00000000..9dca3029 --- /dev/null +++ b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecForAll.java @@ -0,0 +1,17 @@ +package io.eventuate.local.polling.spec; + +public class PollingSpecForAll extends PollingSpec { + + public PollingSpecForAll() { + } + + @Override + public String toString() { + return "PollingSpecForAll{}"; + } + + @Override + public SqlFragment addToWhere(String destination) { + return SqlFragment.EMPTY; + } +} diff --git a/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecForChannel.java b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecForChannel.java new file mode 100644 index 00000000..9f2600a4 --- /dev/null +++ b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/PollingSpecForChannel.java @@ -0,0 +1,21 @@ +package io.eventuate.local.polling.spec; + +public class PollingSpecForChannel extends PollingSpec { + private final String channel; + + public PollingSpecForChannel(String channel) { + this.channel = channel; + } + + @Override + public String toString() { + return "PollingSpecForChannel{" + + "channel='" + channel + '\'' + + '}'; + } + + @Override + public SqlFragment addToWhere(String destination) { + return SqlFragment.make("AND %s = %s", destination, "channel", channel); + } +} diff --git a/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/SqlFragment.java b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/SqlFragment.java new file mode 100644 index 00000000..52468eda --- /dev/null +++ b/eventuate-local-java-cdc-connector-polling/src/main/java/io/eventuate/local/polling/spec/SqlFragment.java @@ -0,0 +1,23 @@ +package io.eventuate.local.polling.spec; + +import java.util.Collections; +import java.util.Map; + +public class SqlFragment { + public final String sql; + public final Map params; + public static final SqlFragment EMPTY = new SqlFragment("", Collections.emptyMap()); + + public SqlFragment(String sql, String placeholderName, Object value) { + this(sql, Collections.singletonMap(placeholderName, value)); + } + + public SqlFragment(String sql, Map params) { + this.sql = sql; + this.params = params; + } + + public static SqlFragment make(String sqlFormat, String column, String placeholderName, Object value) { + return new SqlFragment(String.format(sqlFormat , column, ":" + placeholderName), placeholderName, value); + } +} diff --git a/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/AbstractPollingDaoIntegrationTest.java b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/AbstractPollingDaoIntegrationTest.java new file mode 100644 index 00000000..c10a21bd --- /dev/null +++ b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/AbstractPollingDaoIntegrationTest.java @@ -0,0 +1,143 @@ +package io.eventuate.local.polling; + +import io.eventuate.common.id.IdGenerator; +import io.eventuate.common.jdbc.EventuateSchema; +import io.eventuate.common.jdbc.sqldialect.SqlDialectSelector; +import io.eventuate.common.spring.id.IdGeneratorConfiguration; +import io.eventuate.common.spring.jdbc.EventuateSchemaConfiguration; +import io.eventuate.common.spring.jdbc.sqldialect.SqlDialectConfiguration; +import io.eventuate.local.common.BinlogEntryHandler; +import io.eventuate.local.common.BinlogEntryToPublishedEventConverter; +import io.eventuate.local.common.EventuateConfigurationProperties; +import io.eventuate.local.test.util.TestHelper; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import org.junit.Assert; +import org.junit.Before; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +public class AbstractPollingDaoIntegrationTest { + + + @Configuration + @Import({EventuateSchemaConfiguration.class, SqlDialectConfiguration.class, IdGeneratorConfiguration.class}) + public static class Config { + @Bean + public EventuateConfigurationProperties eventuateConfigurationProperties() { + return new EventuateConfigurationProperties(); + } + + @Bean + public TestHelper testHelper() { + return new TestHelper(); + } + + + } + + protected static final int EVENTS_PER_POLLING_ITERATION = 3; + protected static final int NUMBER_OF_EVENTS_TO_PUBLISH = 10; + + @Autowired + private IdGenerator idGenerator; + + @Value("${spring.datasource.url}") + private String dataSourceURL; + + @Value("${spring.datasource.driver-class-name}") + private String driver; + + @Autowired + private EventuateSchema eventuateSchema; + + + @Autowired + private DataSource dataSource; + + @Autowired + private JdbcTemplate jdbcTemplate; + + @Autowired + private SqlDialectSelector sqlDialectSelector; + + @Autowired + private TestHelper testHelper; + + @Autowired + protected EventuateConfigurationProperties eventuateConfigurationProperties; + + protected AtomicInteger processedEvents; + + protected PollingDao pollingDao; + + @Before + public void init() { + processedEvents = new AtomicInteger(0); + pollingDao = createPollingDao(); + markAllEventsAsPublished(); + } + + + protected List saveEvents() { + List eventIds = new ArrayList<>(); + + for (int i = 0; i < NUMBER_OF_EVENTS_TO_PUBLISH; i++) { + eventIds.add(testHelper.saveRandomEvent().getEventId()); + } + + return eventIds; + } + + private PollingDao createPollingDao() { + MeterRegistry meterRegistry = Mockito.mock(MeterRegistry.class); + Mockito.when(meterRegistry.counter(Mockito.anyString(), Mockito.anyCollection())).thenReturn(Mockito.mock(Counter.class)); + + return new PollingDao(meterRegistry, + dataSourceURL, + dataSource, + EVENTS_PER_POLLING_ITERATION, + 10, + 100, + 1000, + testHelper.generateId(), + sqlDialectSelector.getDialect(driver), + eventuateConfigurationProperties.getOutboxId(), + new ParallelPollingChannels(new HashSet<>(Arrays.asList(eventuateConfigurationProperties.getPollingParallelChannels())))); + } + + protected void assertEventsArePublished(List eventIds) { + eventIds.forEach(this::assertEventIsPublished); + } + + private void assertEventIsPublished(String id) { + Map event = jdbcTemplate.queryForMap(String.format("select * from %s where event_id = ?", eventuateSchema.qualifyTable("events")), id); + Assert.assertEquals(1, ((Number)event.get("published")).intValue()); + } + + protected BinlogEntryHandler prepareBinlogEntryHandler(CompletableFuture resultWhenEventConsumed) { + return pollingDao.addBinlogEntryHandler(eventuateSchema, + "events", + new BinlogEntryToPublishedEventConverter(idGenerator), + event -> { + processedEvents.incrementAndGet(); + return resultWhenEventConsumed; + }); + } + + private void markAllEventsAsPublished() { + jdbcTemplate.execute(String.format("update %s set published = 1", eventuateSchema.qualifyTable("events"))); + } + + +} diff --git a/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/ParallelPollingDaoIntegrationTest.java b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/ParallelPollingDaoIntegrationTest.java new file mode 100644 index 00000000..aa521483 --- /dev/null +++ b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/ParallelPollingDaoIntegrationTest.java @@ -0,0 +1,52 @@ +package io.eventuate.local.polling; + +import io.eventuate.local.common.BinlogEntryHandler; +import io.eventuate.local.testutil.DefaultAndPollingProfilesResolver; +import io.eventuate.util.test.async.Eventually; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.util.List; +import java.util.concurrent.*; + +import static org.junit.Assert.assertEquals; + +@ActiveProfiles(resolver = DefaultAndPollingProfilesResolver.class) +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest(classes = AbstractPollingDaoIntegrationTest.Config.class, properties = "eventuatelocal.cdc.polling.parallel.channels=x,y") +@EnableAutoConfiguration +public class ParallelPollingDaoIntegrationTest extends AbstractPollingDaoIntegrationTest { + + private ExecutorService executor = Executors.newCachedThreadPool(); + + @Test + public void testParallelPolling() throws ExecutionException, InterruptedException, TimeoutException { + BinlogEntryHandler binlogEntryHandler = prepareBinlogEntryHandler(CompletableFuture.completedFuture(null)); + + List eventIds = saveEvents(); + + Future f = executor.submit(() -> pollingDao.start()); + + Eventually.eventually(() -> { + assertEquals(NUMBER_OF_EVENTS_TO_PUBLISH, processedEvents.get()); + assertEventsArePublished(eventIds); + }); + + pollingDao.stop(); + f.get(1, TimeUnit.SECONDS); + + } + + @Test + public void shouldHaveNonEmptyListOfParallelChannels() { + String[] pollingParallelChannels = eventuateConfigurationProperties.getPollingParallelChannels(); + assertEquals(2, pollingParallelChannels.length); + assertEquals("x", pollingParallelChannels[0]); + assertEquals("y", pollingParallelChannels[1]); + } + +} diff --git a/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingBinlogEntryReaderMessageTableTestConfiguration.java b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingBinlogEntryReaderMessageTableTestConfiguration.java index 4d18ae07..4ed381bc 100644 --- a/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingBinlogEntryReaderMessageTableTestConfiguration.java +++ b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingBinlogEntryReaderMessageTableTestConfiguration.java @@ -56,7 +56,7 @@ public PollingDao pollingDao(@Autowired(required = false) MeterRegistry meterReg eventuateConfigurationProperties.getPollingIntervalInMilliseconds(), eventuateConfigurationProperties.getReaderName(), sqlDialectSelector.getDialect(driver), - eventuateConfigurationProperties.getOutboxId()); + eventuateConfigurationProperties.getOutboxId(), ParallelPollingChannels.make(eventuateConfigurationProperties.getPollingParallelChannels())); } @Bean diff --git a/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingDaoIntegrationTest.java b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingDaoIntegrationTest.java index 27631758..fe7c2721 100644 --- a/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingDaoIntegrationTest.java +++ b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingDaoIntegrationTest.java @@ -1,172 +1,68 @@ package io.eventuate.local.polling; -import io.eventuate.common.id.IdGenerator; -import io.eventuate.common.jdbc.EventuateSchema; -import io.eventuate.common.jdbc.sqldialect.SqlDialectSelector; import io.eventuate.local.common.BinlogEntryHandler; -import io.eventuate.local.common.BinlogEntryToPublishedEventConverter; -import io.eventuate.local.common.EventuateConfigurationProperties; -import io.eventuate.local.test.util.SourceTableNameSupplier; -import io.eventuate.local.test.util.TestHelper; +import io.eventuate.local.polling.spec.PollingSpec; import io.eventuate.local.testutil.DefaultAndPollingProfilesResolver; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import javax.sql.DataSource; -import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; @ActiveProfiles(resolver = DefaultAndPollingProfilesResolver.class) @RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = PollingIntegrationTestConfiguration.class) +@SpringBootTest(classes = AbstractPollingDaoIntegrationTest.Config.class) @EnableAutoConfiguration -public class PollingDaoIntegrationTest { - - private static final int EVENTS_PER_POLLING_ITERATION = 3; - private static final int EVENTS = 10; - - - @Value("${spring.datasource.url}") - private String dataSourceURL; - - @Value("${spring.datasource.driver-class-name}") - private String driver; - - @Autowired - private EventuateSchema eventuateSchema; - - @Autowired - private SourceTableNameSupplier sourceTableNameSupplier; - - @Autowired - private DataSource dataSource; - - @Autowired - private JdbcTemplate jdbcTemplate; - - @Autowired - private SqlDialectSelector sqlDialectSelector; +public class PollingDaoIntegrationTest extends AbstractPollingDaoIntegrationTest { - @Autowired - private TestHelper testHelper; + @Test + public void testThatPollingEventCountAreLimited() { + BinlogEntryHandler binlogEntryHandler = prepareBinlogEntryHandler(CompletableFuture.completedFuture(null)); - @Autowired - private EventuateConfigurationProperties eventuateConfigurationProperties; + List eventIds = saveEvents(); - private AtomicInteger processedEvents; + pollingDao.processEvents(binlogEntryHandler, PollingSpec.ALL); - private PollingDao pollingDao; + assertEquals(EVENTS_PER_POLLING_ITERATION, processedEvents.get()); - @Autowired - protected IdGenerator idGenerator; - - @Before - public void init() { - processedEvents = new AtomicInteger(0); - pollingDao = createPollingDao(); - clearEventsTable(); - } - - @Test - public void testThatPollingEventCountAreLimited() { - BinlogEntryHandler binlogEntryHandler = prepareBinlogEntryHandler(CompletableFuture.completedFuture(null)); - - List eventIds = saveEvents(); - - pollingDao.processEvents(binlogEntryHandler); - - Assert.assertEquals(EVENTS_PER_POLLING_ITERATION, processedEvents.get()); - - assertEventsArePublished(eventIds.subList(0, EVENTS_PER_POLLING_ITERATION)); - } - - @Test - public void testMessagesAreNotProcessedTwice() throws InterruptedException { - CompletableFuture completableFuture = new CompletableFuture(); + assertEventsArePublished(eventIds.subList(0, EVENTS_PER_POLLING_ITERATION)); + } - BinlogEntryHandler binlogEntryHandler = prepareBinlogEntryHandler(completableFuture); + @Test + public void testMessagesAreNotProcessedTwice() throws InterruptedException { + CompletableFuture completableFuture = new CompletableFuture(); - saveEvents(); + BinlogEntryHandler binlogEntryHandler = prepareBinlogEntryHandler(completableFuture); - CountDownLatch allIterationsComplete = new CountDownLatch(1); + saveEvents(); - CompletableFuture.supplyAsync(() -> { - for (int i = 0; i < (EVENTS / EVENTS_PER_POLLING_ITERATION) * 2; i++) { - pollingDao.processEvents(binlogEntryHandler); - } - allIterationsComplete.countDown(); - return null; - }); + CountDownLatch allIterationsComplete = new CountDownLatch(1); - Thread.sleep(3000); - completableFuture.complete(null); - allIterationsComplete.await(); + CompletableFuture.supplyAsync(() -> { + for (int i = 0; i < (NUMBER_OF_EVENTS_TO_PUBLISH / EVENTS_PER_POLLING_ITERATION) * 2; i++) { + pollingDao.processEvents(binlogEntryHandler, PollingSpec.ALL); + } + allIterationsComplete.countDown(); + return null; + }); - Assert.assertEquals(EVENTS, processedEvents.get()); - } + Thread.sleep(3000); + completableFuture.complete(null); + allIterationsComplete.await(); - private List saveEvents() { - List eventIds = new ArrayList<>(); + assertEquals(NUMBER_OF_EVENTS_TO_PUBLISH, processedEvents.get()); + } - for (int i = 0; i < EVENTS; i++) { - eventIds.add(testHelper.saveRandomEvent().getEventId()); + @Test + public void shouldHaveEmptyListOfParallelChannels() { + assertEquals(0, eventuateConfigurationProperties.getPollingParallelChannels().length); } - return eventIds; - } - - private PollingDao createPollingDao() { - MeterRegistry meterRegistry = Mockito.mock(MeterRegistry.class); - Mockito.when(meterRegistry.counter(Mockito.anyString(), Mockito.anyCollection())).thenReturn(Mockito.mock(Counter.class)); - - return new PollingDao(meterRegistry, - dataSourceURL, - dataSource, - EVENTS_PER_POLLING_ITERATION, - 10, - 100, - 1000, - testHelper.generateId(), - sqlDialectSelector.getDialect(driver), - eventuateConfigurationProperties.getOutboxId()); - } - - private void assertEventsArePublished(List eventIds) { - eventIds.forEach(this::assertEventIsPublished); - } - - private void assertEventIsPublished(String id) { - Map event = jdbcTemplate.queryForMap(String.format("select * from %s where event_id = ?", eventuateSchema.qualifyTable("events")), id); - Assert.assertEquals(1, ((Number)event.get("published")).intValue()); - } - - private BinlogEntryHandler prepareBinlogEntryHandler(CompletableFuture resultWhenEventConsumed) { - return pollingDao.addBinlogEntryHandler(eventuateSchema, - sourceTableNameSupplier.getSourceTableName(), - new BinlogEntryToPublishedEventConverter(idGenerator), - event -> { - processedEvents.incrementAndGet(); - return resultWhenEventConsumed; - }); - } - - private void clearEventsTable() { - jdbcTemplate.execute(String.format("update %s set published = 1", eventuateSchema.qualifyTable("events"))); - } } diff --git a/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingIntegrationTestConfiguration.java b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingIntegrationTestConfiguration.java index ce8571ca..4af75ddb 100644 --- a/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingIntegrationTestConfiguration.java +++ b/eventuate-local-java-cdc-connector-polling/src/test/java/io/eventuate/local/polling/PollingIntegrationTestConfiguration.java @@ -100,7 +100,7 @@ public BinlogEntryReader pollingDao(@Autowired(required = false) MeterRegistry m eventuateConfigurationProperties.getPollingIntervalInMilliseconds(), eventuateConfigurationProperties.getReaderName(), sqlDialectSelector.getDialect(driver), - eventuateConfigurationProperties.getOutboxId()); + eventuateConfigurationProperties.getOutboxId(), ParallelPollingChannels.make(eventuateConfigurationProperties.getPollingParallelChannels())); } @Bean diff --git a/eventuate-local-java-cdc-connector-postgres-wal/src/main/java/io/eventuate/local/postgres/wal/PostgresWalClient.java b/eventuate-local-java-cdc-connector-postgres-wal/src/main/java/io/eventuate/local/postgres/wal/PostgresWalClient.java index e03ea975..c87647d7 100644 --- a/eventuate-local-java-cdc-connector-postgres-wal/src/main/java/io/eventuate/local/postgres/wal/PostgresWalClient.java +++ b/eventuate-local-java-cdc-connector-postgres-wal/src/main/java/io/eventuate/local/postgres/wal/PostgresWalClient.java @@ -3,7 +3,10 @@ import io.eventuate.common.jdbc.EventuateSchema; import io.eventuate.common.jdbc.SchemaAndTable; import io.eventuate.common.json.mapper.JSonMapper; -import io.eventuate.local.common.*; +import io.eventuate.local.common.BinlogEntry; +import io.eventuate.local.common.BinlogEntryHandler; +import io.eventuate.local.common.CdcProcessingStatusService; +import io.eventuate.local.common.OffsetProcessor; import io.eventuate.local.db.log.common.DbLogClient; import io.micrometer.core.instrument.MeterRegistry; import org.postgresql.PGConnection; @@ -38,7 +41,7 @@ public class PostgresWalClient extends DbLogClient { private OffsetProcessor offsetProcessor; public PostgresWalClient(MeterRegistry meterRegistry, - String url, + String dataSourceUrl, String user, String password, int walIntervalInMilliseconds, @@ -59,7 +62,7 @@ public PostgresWalClient(MeterRegistry meterRegistry, super(meterRegistry, user, password, - url, + dataSourceUrl, dataSource, readerName, replicationLagMeasuringIntervalInMilliseconds, diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProvider.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProvider.java new file mode 100644 index 00000000..2a438b97 --- /dev/null +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProvider.java @@ -0,0 +1,98 @@ +package io.eventuate.local.unified.cdc.pipeline; + +import io.eventuate.local.common.BinlogEntryReader; +import io.eventuate.local.unified.cdc.pipeline.common.PropertyReader; +import io.eventuate.local.unified.cdc.pipeline.common.factory.CdcPipelineReaderFactory; +import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineProperties; +import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties; +import io.eventuate.local.unified.cdc.pipeline.common.properties.RawUnifiedCdcProperties; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +public class PipelineConfigPropertiesProvider { + + private PropertyReader propertyReader = new PropertyReader(); + + private RawUnifiedCdcProperties rawUnifiedCdcProperties; + private Collection cdcPipelineReaderFactories; + + public PipelineConfigPropertiesProvider(RawUnifiedCdcProperties rawUnifiedCdcProperties, Collection cdcPipelineReaderFactories) { + this.rawUnifiedCdcProperties = rawUnifiedCdcProperties; + this.cdcPipelineReaderFactories = cdcPipelineReaderFactories; + } + + public Optional> pipelineReaderProperties() { + if (rawUnifiedCdcProperties.isReaderPropertiesDeclared()) { + return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getReader(), this::createCdcPipelineReaderProperties)); + } else + return Optional.empty(); + } + + public Optional> pipelineProperties() { + if (rawUnifiedCdcProperties.isPipelinePropertiesDeclared()) { + return Optional.of(makeFromProperties(rawUnifiedCdcProperties.getPipeline(), this::createPipelineProperties)); + } else + return Optional.empty(); + } + + private List makeFromProperties(Map> properties, BiFunction, T> creator) { + return properties.entrySet().stream().map(entry -> creator.apply(entry.getKey(), entry.getValue())).collect(Collectors.toList()); + } + + private CdcPipelineReaderProperties createCdcPipelineReaderProperties(String name, Map properties) { + + String readerType = getReaderType(name, properties); + + CdcPipelineReaderFactory cdcPipelineReaderFactory = + findCdcPipelineReaderFactory(readerType); + + return makeReaderProperties(name, properties, cdcPipelineReaderFactory.propertyClass()); + } + + private String getReaderType(String name, Map properties) { + CdcPipelineReaderProperties cdcPipelineReaderProperties = propertyReader + .convertMapToPropertyClass(properties, CdcPipelineReaderProperties.class); + cdcPipelineReaderProperties.setReaderName(name); + cdcPipelineReaderProperties.validate(); + return cdcPipelineReaderProperties.getType(); + } + + private CdcPipelineReaderProperties makeReaderProperties(String name, Map properties, Class propertyClass) { + propertyReader.checkForUnknownProperties(properties, propertyClass); + + CdcPipelineReaderProperties exactCdcPipelineReaderProperties = propertyReader + .convertMapToPropertyClass(properties, propertyClass); + exactCdcPipelineReaderProperties.setReaderName(name); + exactCdcPipelineReaderProperties.validate(); + return exactCdcPipelineReaderProperties; + } + + private CdcPipelineReaderFactory findCdcPipelineReaderFactory(String type) { + return cdcPipelineReaderFactories + .stream() + .filter(factory -> factory.supports(type)) + .findAny() + .orElseThrow(() -> + new RuntimeException(String.format("reader factory not found for type %s", + type))); + } + + private CdcPipelineProperties createPipelineProperties(String name, Map properties) { + + propertyReader.checkForUnknownProperties(properties, CdcPipelineProperties.class); + + CdcPipelineProperties cdcPipelineProperties = propertyReader + .convertMapToPropertyClass(properties, CdcPipelineProperties.class); + + cdcPipelineProperties.validate(); + + return cdcPipelineProperties; + } + + +} diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/UnifiedCdcConfigurator.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/UnifiedCdcConfigurator.java index 7dcf0615..d68ebfd6 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/UnifiedCdcConfigurator.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/UnifiedCdcConfigurator.java @@ -1,10 +1,8 @@ package io.eventuate.local.unified.cdc.pipeline; -import io.eventuate.common.eventuate.local.PublishedEvent; import io.eventuate.coordination.leadership.LeaderSelectorFactory; import io.eventuate.local.common.BinlogEntryReader; import io.eventuate.local.common.BinlogEntryReaderLeadership; -import io.eventuate.local.common.ConnectionPoolConfigurationProperties; import io.eventuate.local.mysql.binlog.MySqlBinaryLogClient; import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider; import io.eventuate.local.unified.cdc.pipeline.common.CdcPipeline; @@ -12,7 +10,6 @@ import io.eventuate.local.unified.cdc.pipeline.common.PropertyReader; import io.eventuate.local.unified.cdc.pipeline.common.factory.CdcPipelineFactory; import io.eventuate.local.unified.cdc.pipeline.common.factory.CdcPipelineReaderFactory; -import io.eventuate.local.unified.cdc.pipeline.common.factory.DataSourceFactory; import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineProperties; import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties; import io.eventuate.local.unified.cdc.pipeline.common.properties.RawUnifiedCdcProperties; @@ -24,7 +21,6 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.sql.DataSource; import java.util.*; import java.util.stream.Collectors; @@ -107,9 +103,7 @@ private void start() { if (rawUnifiedCdcProperties.isPipelinePropertiesDeclared()) { rawUnifiedCdcProperties .getPipeline() - .keySet() - .forEach(pipeline -> createStartSaveCdcPipeline(pipeline, - rawUnifiedCdcProperties.getPipeline().get(pipeline))); + .forEach(this::createStartSaveCdcPipeline); } else { createStartSaveCdcDefaultPipeline(defaultCdcPipelineProperties); } @@ -186,7 +180,7 @@ private void createStartSaveCdcDefaultPipelineReader(CdcPipelineReaderProperties leaderSelectorFactory, binlogEntryReader); - binlogEntryReaderProvider.add(cdcDefaultPipelineReaderProperties.getReaderName(), binlogEntryReaderLeadership, cdcDefaultPipelineReaderProperties); + binlogEntryReaderProvider.add(cdcDefaultPipelineReaderProperties.getReaderName(), binlogEntryReaderLeadership); } private CdcPipeline createCdcPipeline(CdcPipelineProperties properties) { @@ -197,20 +191,12 @@ private CdcPipeline createCdcPipeline(CdcPipelineProperties properties) { private void createCdcPipelineReader(String name, Map properties) { - CdcPipelineReaderProperties cdcPipelineReaderProperties = propertyReader - .convertMapToPropertyClass(properties, CdcPipelineReaderProperties.class); - cdcPipelineReaderProperties.setReaderName(name); - cdcPipelineReaderProperties.validate(); + String readerType = getReaderType(name, properties); CdcPipelineReaderFactory cdcPipelineReaderFactory = - findCdcPipelineReaderFactory(cdcPipelineReaderProperties.getType()); - - propertyReader.checkForUnknownProperties(properties, cdcPipelineReaderFactory.propertyClass()); + findCdcPipelineReaderFactory(readerType); - CdcPipelineReaderProperties exactCdcPipelineReaderProperties = propertyReader - .convertMapToPropertyClass(properties, cdcPipelineReaderFactory.propertyClass()); - exactCdcPipelineReaderProperties.setReaderName(name); - exactCdcPipelineReaderProperties.validate(); + CdcPipelineReaderProperties exactCdcPipelineReaderProperties = makeReaderProperties(name, properties, cdcPipelineReaderFactory); pipelineReaderProperties.put(name.toLowerCase(), exactCdcPipelineReaderProperties); @@ -220,10 +206,29 @@ private void createCdcPipelineReader(String name, Map properties leaderSelectorFactory, binlogEntryReader); - binlogEntryReaderProvider.add(name, binlogEntryReaderLeadership, cdcPipelineReaderProperties); + binlogEntryReaderProvider.add(name, binlogEntryReaderLeadership); + } + + private CdcPipelineReaderProperties makeReaderProperties(String name, Map properties, CdcPipelineReaderFactory cdcPipelineReaderFactory) { + propertyReader.checkForUnknownProperties(properties, cdcPipelineReaderFactory.propertyClass()); + + CdcPipelineReaderProperties exactCdcPipelineReaderProperties = propertyReader + .convertMapToPropertyClass(properties, cdcPipelineReaderFactory.propertyClass()); + exactCdcPipelineReaderProperties.setReaderName(name); + exactCdcPipelineReaderProperties.validate(); + return exactCdcPipelineReaderProperties; + } + + private String getReaderType(String name, Map properties) { + CdcPipelineReaderProperties cdcPipelineReaderProperties = propertyReader + .convertMapToPropertyClass(properties, CdcPipelineReaderProperties.class); + cdcPipelineReaderProperties.setReaderName(name); + cdcPipelineReaderProperties.validate(); + String readType = cdcPipelineReaderProperties.getType(); + return readType; } - private CdcPipelineFactory findCdcPipelineFactory(String type) { + private CdcPipelineFactory findCdcPipelineFactory(String type) { return cdcPipelineFactories .stream() .filter(factory -> factory.supports(type)) diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/common/BinlogEntryReaderProvider.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/common/BinlogEntryReaderProvider.java index 1fe114a3..dd712203 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/common/BinlogEntryReaderProvider.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/common/BinlogEntryReaderProvider.java @@ -1,7 +1,6 @@ package io.eventuate.local.unified.cdc.pipeline.common; import io.eventuate.local.common.BinlogEntryReaderLeadership; -import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; @@ -16,7 +15,7 @@ Filled and all readers started from single thread (CdcPipelineConfigurator.start */ private ConcurrentMap clients = new ConcurrentHashMap<>(); - public void add(String name, BinlogEntryReaderLeadership binlogEntryReaderLeadership, CdcPipelineReaderProperties properties) { + public void add(String name, BinlogEntryReaderLeadership binlogEntryReaderLeadership) { clients.put(name.toLowerCase(), binlogEntryReaderLeadership); } diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/common/factory/CommonCdcPipelineReaderFactory.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/common/factory/CommonCdcPipelineReaderFactory.java index a31bd239..e2a81ea5 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/common/factory/CommonCdcPipelineReaderFactory.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/common/factory/CommonCdcPipelineReaderFactory.java @@ -1,33 +1,22 @@ package io.eventuate.local.unified.cdc.pipeline.common.factory; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; -import io.eventuate.coordination.leadership.LeaderSelectorFactory; import io.eventuate.local.common.BinlogEntryReader; import io.eventuate.local.common.ConnectionPoolConfigurationProperties; -import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider; import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties; import io.micrometer.core.instrument.MeterRegistry; import javax.sql.DataSource; -import java.util.Properties; abstract public class CommonCdcPipelineReaderFactory implements CdcPipelineReaderFactory { protected MeterRegistry meterRegistry; - protected LeaderSelectorFactory leaderSelectorFactory; - protected BinlogEntryReaderProvider binlogEntryReaderProvider; protected ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties; public CommonCdcPipelineReaderFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { this.meterRegistry = meterRegistry; - this.leaderSelectorFactory = leaderSelectorFactory; - this.binlogEntryReaderProvider = binlogEntryReaderProvider; this.connectionPoolConfigurationProperties = connectionPoolConfigurationProperties; } diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/mysqlbinlog/configuration/MySqlBinlogCdcPipelineReaderConfiguration.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/mysqlbinlog/configuration/MySqlBinlogCdcPipelineReaderConfiguration.java index b509d295..45599963 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/mysqlbinlog/configuration/MySqlBinlogCdcPipelineReaderConfiguration.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/mysqlbinlog/configuration/MySqlBinlogCdcPipelineReaderConfiguration.java @@ -1,8 +1,6 @@ package io.eventuate.local.unified.cdc.pipeline.dblog.mysqlbinlog.configuration; -import io.eventuate.coordination.leadership.LeaderSelectorFactory; import io.eventuate.local.common.ConnectionPoolConfigurationProperties; -import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider; import io.eventuate.local.unified.cdc.pipeline.common.factory.CdcPipelineReaderFactory; import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties; import io.eventuate.local.unified.cdc.pipeline.dblog.common.configuration.CommonDbLogCdcDefaultPipelineReaderConfiguration; @@ -20,15 +18,11 @@ public class MySqlBinlogCdcPipelineReaderConfiguration extends CommonDbLogCdcDef @Bean("eventuateLocalMySqlBinlogCdcPipelineReaderFactory") public CdcPipelineReaderFactory mySqlBinlogCdcPipelineReaderFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, OffsetStoreFactory offsetStoreFactory, DebeziumOffsetStoreFactory debeziumOffsetStoreFactory, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { return new MySqlBinlogCdcPipelineReaderFactory(meterRegistry, - leaderSelectorFactory, - binlogEntryReaderProvider, offsetStoreFactory, debeziumOffsetStoreFactory, connectionPoolConfigurationProperties); @@ -37,15 +31,11 @@ public CdcPipelineReaderFactory mySqlBinlogCdcPipelineReaderFactory(MeterRegistr @Conditional(MySqlBinlogCondition.class) @Bean("defaultCdcPipelineReaderFactory") public CdcPipelineReaderFactory defaultMySqlBinlogCdcPipelineFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, OffsetStoreFactory offsetStoreFactory, DebeziumOffsetStoreFactory debeziumOffsetStoreFactory, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { return new MySqlBinlogCdcPipelineReaderFactory(meterRegistry, - leaderSelectorFactory, - binlogEntryReaderProvider, offsetStoreFactory, debeziumOffsetStoreFactory, connectionPoolConfigurationProperties); diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/mysqlbinlog/factory/MySqlBinlogCdcPipelineReaderFactory.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/mysqlbinlog/factory/MySqlBinlogCdcPipelineReaderFactory.java index 7ed27a60..7aff33fe 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/mysqlbinlog/factory/MySqlBinlogCdcPipelineReaderFactory.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/mysqlbinlog/factory/MySqlBinlogCdcPipelineReaderFactory.java @@ -1,12 +1,10 @@ package io.eventuate.local.unified.cdc.pipeline.dblog.mysqlbinlog.factory; -import io.eventuate.coordination.leadership.LeaderSelectorFactory; import io.eventuate.common.jdbc.EventuateSchema; import io.eventuate.local.common.ConnectionPoolConfigurationProperties; import io.eventuate.local.db.log.common.OffsetStore; import io.eventuate.local.mysql.binlog.DebeziumBinlogOffsetKafkaStore; import io.eventuate.local.mysql.binlog.MySqlBinaryLogClient; -import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider; import io.eventuate.local.unified.cdc.pipeline.common.factory.CommonCdcPipelineReaderFactory; import io.eventuate.local.unified.cdc.pipeline.dblog.common.factory.OffsetStoreFactory; import io.eventuate.local.unified.cdc.pipeline.dblog.mysqlbinlog.properties.MySqlBinlogCdcPipelineReaderProperties; @@ -22,15 +20,11 @@ public class MySqlBinlogCdcPipelineReaderFactory extends CommonCdcPipelineReader private OffsetStoreFactory offsetStoreFactory; public MySqlBinlogCdcPipelineReaderFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, OffsetStoreFactory offsetStoreFactory, DebeziumOffsetStoreFactory debeziumOffsetStoreFactory, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { super(meterRegistry, - leaderSelectorFactory, - binlogEntryReaderProvider, connectionPoolConfigurationProperties); this.debeziumOffsetStoreFactory = debeziumOffsetStoreFactory; diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/postgreswal/configuration/PostgresWalCdcPipelineReaderConfiguration.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/postgreswal/configuration/PostgresWalCdcPipelineReaderConfiguration.java index 814e59ec..9c6c73bd 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/postgreswal/configuration/PostgresWalCdcPipelineReaderConfiguration.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/postgreswal/configuration/PostgresWalCdcPipelineReaderConfiguration.java @@ -1,8 +1,6 @@ package io.eventuate.local.unified.cdc.pipeline.dblog.postgreswal.configuration; -import io.eventuate.coordination.leadership.LeaderSelectorFactory; import io.eventuate.local.common.ConnectionPoolConfigurationProperties; -import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider; import io.eventuate.local.unified.cdc.pipeline.common.factory.CdcPipelineReaderFactory; import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties; import io.eventuate.local.unified.cdc.pipeline.dblog.common.configuration.CommonDbLogCdcDefaultPipelineReaderConfiguration; @@ -18,26 +16,18 @@ public class PostgresWalCdcPipelineReaderConfiguration extends CommonDbLogCdcDef @Bean("eventuateLocalPostgresWalCdcPipelineReaderFactory") public CdcPipelineReaderFactory postgresWalCdcPipelineReaderFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { return new PostgresWalCdcPipelineReaderFactory(meterRegistry, - leaderSelectorFactory, - binlogEntryReaderProvider, connectionPoolConfigurationProperties); } @Profile("PostgresWal") @Bean("defaultCdcPipelineReaderFactory") public CdcPipelineReaderFactory defaultPostgresWalCdcPipelineReaderFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { return new PostgresWalCdcPipelineReaderFactory(meterRegistry, - leaderSelectorFactory, - binlogEntryReaderProvider, connectionPoolConfigurationProperties); } diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/postgreswal/factory/PostgresWalCdcPipelineReaderFactory.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/postgreswal/factory/PostgresWalCdcPipelineReaderFactory.java index cee0b44e..06e02de0 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/postgreswal/factory/PostgresWalCdcPipelineReaderFactory.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/dblog/postgreswal/factory/PostgresWalCdcPipelineReaderFactory.java @@ -1,10 +1,8 @@ package io.eventuate.local.unified.cdc.pipeline.dblog.postgreswal.factory; import io.eventuate.common.jdbc.EventuateSchema; -import io.eventuate.coordination.leadership.LeaderSelectorFactory; import io.eventuate.local.common.ConnectionPoolConfigurationProperties; import io.eventuate.local.postgres.wal.PostgresWalClient; -import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider; import io.eventuate.local.unified.cdc.pipeline.common.factory.CommonCdcPipelineReaderFactory; import io.eventuate.local.unified.cdc.pipeline.dblog.postgreswal.properties.PostgresWalCdcPipelineReaderProperties; import io.micrometer.core.instrument.MeterRegistry; @@ -17,13 +15,9 @@ public class PostgresWalCdcPipelineReaderFactory public static final String TYPE = "postgres-wal"; public PostgresWalCdcPipelineReaderFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { super(meterRegistry, - leaderSelectorFactory, - binlogEntryReaderProvider, connectionPoolConfigurationProperties); } diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/configuration/PollingCdcPipelineReaderConfiguration.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/configuration/PollingCdcPipelineReaderConfiguration.java index cd6e064f..6d9f5b67 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/configuration/PollingCdcPipelineReaderConfiguration.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/configuration/PollingCdcPipelineReaderConfiguration.java @@ -1,9 +1,7 @@ package io.eventuate.local.unified.cdc.pipeline.polling.configuration; import io.eventuate.common.jdbc.sqldialect.SqlDialectSelector; -import io.eventuate.coordination.leadership.LeaderSelectorFactory; import io.eventuate.local.common.ConnectionPoolConfigurationProperties; -import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider; import io.eventuate.local.unified.cdc.pipeline.common.configuration.CommonCdcDefaultPipelineReaderConfiguration; import io.eventuate.local.unified.cdc.pipeline.common.factory.CdcPipelineReaderFactory; import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties; @@ -14,19 +12,18 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; +import java.util.Arrays; +import java.util.HashSet; + @Configuration public class PollingCdcPipelineReaderConfiguration extends CommonCdcDefaultPipelineReaderConfiguration { @Bean("eventuateLocalPollingCdcPipelineReaderFactory") public CdcPipelineReaderFactory pollingCdcPipelineReaderFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, SqlDialectSelector sqlDialectSelector, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { return new PollingCdcPipelineReaderFactory(meterRegistry, - leaderSelectorFactory, - binlogEntryReaderProvider, sqlDialectSelector, connectionPoolConfigurationProperties); } @@ -34,14 +31,10 @@ public CdcPipelineReaderFactory pollingCdcPipelineReaderFactory(MeterRegistry me @Profile("EventuatePolling") @Bean("defaultCdcPipelineReaderFactory") public CdcPipelineReaderFactory defaultPollingCdcPipelineReaderFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, SqlDialectSelector sqlDialectSelector, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { return new PollingCdcPipelineReaderFactory(meterRegistry, - leaderSelectorFactory, - binlogEntryReaderProvider, sqlDialectSelector, connectionPoolConfigurationProperties); } @@ -65,7 +58,7 @@ private PollingPipelineReaderProperties createPollingPipelineReaderProperties() pollingPipelineReaderProperties.setMaxEventsPerPolling(eventuateConfigurationProperties.getMaxEventsPerPolling()); pollingPipelineReaderProperties.setMaxAttemptsForPolling(eventuateConfigurationProperties.getMaxAttemptsForPolling()); pollingPipelineReaderProperties.setPollingRetryIntervalInMilliseconds(eventuateConfigurationProperties.getPollingRetryIntervalInMilliseconds()); - + pollingPipelineReaderProperties.setPollingParallelChannels(new HashSet<>(Arrays.asList(eventuateConfigurationProperties.getPollingParallelChannels()))); return pollingPipelineReaderProperties; } } diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/factory/PollingCdcPipelineReaderFactory.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/factory/PollingCdcPipelineReaderFactory.java index 3cbdf310..3f9ae47e 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/factory/PollingCdcPipelineReaderFactory.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/factory/PollingCdcPipelineReaderFactory.java @@ -1,10 +1,9 @@ package io.eventuate.local.unified.cdc.pipeline.polling.factory; import io.eventuate.common.jdbc.sqldialect.SqlDialectSelector; -import io.eventuate.coordination.leadership.LeaderSelectorFactory; import io.eventuate.local.common.ConnectionPoolConfigurationProperties; +import io.eventuate.local.polling.ParallelPollingChannels; import io.eventuate.local.polling.PollingDao; -import io.eventuate.local.unified.cdc.pipeline.common.BinlogEntryReaderProvider; import io.eventuate.local.unified.cdc.pipeline.common.factory.CommonCdcPipelineReaderFactory; import io.eventuate.local.unified.cdc.pipeline.polling.properties.PollingPipelineReaderProperties; import io.micrometer.core.instrument.MeterRegistry; @@ -16,12 +15,10 @@ public class PollingCdcPipelineReaderFactory extends CommonCdcPipelineReaderFact private SqlDialectSelector sqlDialectSelector; public PollingCdcPipelineReaderFactory(MeterRegistry meterRegistry, - LeaderSelectorFactory leaderSelectorFactory, - BinlogEntryReaderProvider binlogEntryReaderProvider, SqlDialectSelector sqlDialectSelector, ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties) { - super(meterRegistry, leaderSelectorFactory, binlogEntryReaderProvider, connectionPoolConfigurationProperties); + super(meterRegistry, connectionPoolConfigurationProperties); this.sqlDialectSelector = sqlDialectSelector; } @@ -43,7 +40,7 @@ public PollingDao create(PollingPipelineReaderProperties readerProperties) { readerProperties.getPollingIntervalInMilliseconds(), readerProperties.getReaderName(), sqlDialectSelector.getDialect(readerProperties.getDataSourceDriverClassName()), - readerProperties.getOutboxId()); + readerProperties.getOutboxId(), new ParallelPollingChannels(readerProperties.getPollingParallelChannels())); } @Override diff --git a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/properties/PollingPipelineReaderProperties.java b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/properties/PollingPipelineReaderProperties.java index 33a15521..18ad73a1 100644 --- a/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/properties/PollingPipelineReaderProperties.java +++ b/eventuate-local-java-cdc-connector-unified/src/main/java/io/eventuate/local/unified/cdc/pipeline/polling/properties/PollingPipelineReaderProperties.java @@ -3,11 +3,16 @@ import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties; +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + public class PollingPipelineReaderProperties extends CdcPipelineReaderProperties { private Integer pollingIntervalInMilliseconds = 500; private Integer maxEventsPerPolling = 1000; private Integer maxAttemptsForPolling = 100; private Integer pollingRetryIntervalInMilliseconds = 500; + private Set pollingParallelChannels; public Integer getPollingIntervalInMilliseconds() { return pollingIntervalInMilliseconds; @@ -40,4 +45,19 @@ public Integer getPollingRetryIntervalInMilliseconds() { public void setPollingRetryIntervalInMilliseconds(Integer pollingRetryIntervalInMilliseconds) { this.pollingRetryIntervalInMilliseconds = pollingRetryIntervalInMilliseconds; } + + public void setPollingParallelChannels(Set pollingParallelChannels) { + this.pollingParallelChannels = pollingParallelChannels; + } + + public void setPollingParallelChannelNames(String pollingParallelChannelNames) { + pollingParallelChannelNames = pollingParallelChannelNames.trim(); + if (pollingParallelChannelNames.isEmpty()) + return; + this.pollingParallelChannels = Arrays.stream(pollingParallelChannelNames.split(",")).map(String::trim).collect(Collectors.toSet()); + } + + public Set getPollingParallelChannels() { + return pollingParallelChannels; + } } diff --git a/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProviderTest.java b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProviderTest.java new file mode 100644 index 00000000..fa9adb59 --- /dev/null +++ b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/PipelineConfigPropertiesProviderTest.java @@ -0,0 +1,79 @@ +package io.eventuate.local.unified.cdc.pipeline; + +import io.eventuate.common.jdbc.sqldialect.SqlDialectSelector; +import io.eventuate.local.common.ConnectionPoolConfigurationProperties; +import io.eventuate.local.unified.cdc.pipeline.common.factory.CdcPipelineReaderFactory; +import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineProperties; +import io.eventuate.local.unified.cdc.pipeline.common.properties.CdcPipelineReaderProperties; +import io.eventuate.local.unified.cdc.pipeline.common.properties.RawUnifiedCdcProperties; +import io.eventuate.local.unified.cdc.pipeline.polling.factory.PollingCdcPipelineReaderFactory; +import io.eventuate.local.unified.cdc.pipeline.polling.properties.PollingPipelineReaderProperties; +import io.micrometer.core.instrument.MeterRegistry; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + + +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest(classes = PipelineConfigPropertiesProviderTest.Config.class, properties = "eventuatelocal.cdc.polling.parallel.channels=x,y") +@TestPropertySource(locations="/sample-pipeline-config.properties") +public class PipelineConfigPropertiesProviderTest { + + @Configuration + @EnableConfigurationProperties(RawUnifiedCdcProperties.class) + public static class Config { + + @Bean + public PipelineConfigPropertiesProvider pipelineConfigPropertiesProvider(RawUnifiedCdcProperties rawUnifiedCdcProperties, Collection cdcPipelineReaderFactories) { + return new PipelineConfigPropertiesProvider(rawUnifiedCdcProperties, cdcPipelineReaderFactories); + } + + @Bean + public PollingCdcPipelineReaderFactory pollingCdcPipelineReaderFactory(ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties, SqlDialectSelector sqlDialectSelector, MeterRegistry meterRegistry) { + return new PollingCdcPipelineReaderFactory(meterRegistry, sqlDialectSelector, connectionPoolConfigurationProperties); + } + } + + @MockBean + private MeterRegistry meterRegistry; + + @MockBean + private SqlDialectSelector sqlDialectSelector; + + @MockBean + private ConnectionPoolConfigurationProperties connectionPoolConfigurationProperties; + + @Autowired + private PipelineConfigPropertiesProvider pipelineConfigPropertiesProvider; + + @Test + public void shouldProvideReaderProperties() { + List readers = pipelineConfigPropertiesProvider.pipelineReaderProperties().get(); + assertEquals(1, readers.size()); + PollingPipelineReaderProperties reader = (PollingPipelineReaderProperties) readers.get(0); + Set expectedChannels = new HashSet<>(); + expectedChannels.add("parallel_channel_1"); + expectedChannels.add("parallel_channel_2"); + assertEquals(expectedChannels, reader.getPollingParallelChannels()); + } + + @Test + public void shouldProvidePipelineProperties() { + List pipelines = pipelineConfigPropertiesProvider.pipelineProperties().get(); + assertEquals(2, pipelines.size()); + } +} \ No newline at end of file diff --git a/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/common/factory/DataSourceCreationTest.java b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/common/factory/DataSourceCreationTest.java index af9e8be4..1cd1d5ea 100644 --- a/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/common/factory/DataSourceCreationTest.java +++ b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/common/factory/DataSourceCreationTest.java @@ -56,7 +56,7 @@ public void testUnknownProperty() { } private CommonCdcPipelineReaderFactory createCdcPipelineReaderFactory() { - return new CommonCdcPipelineReaderFactory(null, null, null, connectionPoolConfigurationProperties) { + return new CommonCdcPipelineReaderFactory(null, connectionPoolConfigurationProperties) { @Override public BinlogEntryReader create(CdcPipelineReaderProperties cdcPipelineReaderProperties) { return null; diff --git a/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/polling/PollingPipelineReaderPropertiesConversionTest.java b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/polling/PollingPipelineReaderPropertiesConversionTest.java new file mode 100644 index 00000000..c9c9dbe1 --- /dev/null +++ b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/polling/PollingPipelineReaderPropertiesConversionTest.java @@ -0,0 +1,26 @@ +package io.eventuate.local.unified.cdc.pipeline.polling; + +import io.eventuate.local.unified.cdc.pipeline.common.PropertyReader; +import io.eventuate.local.unified.cdc.pipeline.polling.properties.PollingPipelineReaderProperties; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class PollingPipelineReaderPropertiesConversionTest { + + @Test + public void shouldConvertProperties() { + PropertyReader propertyReader = new PropertyReader(); + Map properties = Collections.singletonMap("pollingParallelChannelNames", "x,y"); + PollingPipelineReaderProperties readerProps = propertyReader.convertMapToPropertyClass(properties, PollingPipelineReaderProperties.class); + Set expectedNames = new HashSet<>(); + expectedNames.add("x"); + expectedNames.add("y"); + assertEquals(expectedNames, readerProps.getPollingParallelChannels()); + } +} diff --git a/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/polling/PollingPipelineReaderPropertyValidationTest.java b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/polling/PollingPipelineReaderPropertyValidationTest.java index 95f49a7b..aeb03422 100644 --- a/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/polling/PollingPipelineReaderPropertyValidationTest.java +++ b/eventuate-local-java-cdc-connector-unified/src/test/java/io/eventuate/local/unified/cdc/pipeline/polling/PollingPipelineReaderPropertyValidationTest.java @@ -1,7 +1,6 @@ package io.eventuate.local.unified.cdc.pipeline.polling; import io.eventuate.local.unified.cdc.pipeline.common.CommonPipelineReaderPropertyValidationTest; -import io.eventuate.local.unified.cdc.pipeline.dblog.postgreswal.properties.PostgresWalCdcPipelineReaderProperties; import io.eventuate.local.unified.cdc.pipeline.polling.factory.PollingCdcPipelineReaderFactory; import io.eventuate.local.unified.cdc.pipeline.polling.properties.PollingPipelineReaderProperties; import org.junit.Assert; @@ -13,12 +12,12 @@ public class PollingPipelineReaderPropertyValidationTest extends CommonPipelineR public void testPollingProperties() throws Exception { PropertyBuilder propertyBuilder = new PropertyBuilder(); - assertExceptionMessage(propertyBuilder.toString(), PostgresWalCdcPipelineReaderProperties.class, "type must not be null"); + assertExceptionMessage(propertyBuilder.toString(), PollingPipelineReaderProperties.class, "type must not be null"); propertyBuilder.addString("type", PollingCdcPipelineReaderFactory.TYPE); - testCommonRequiredProperties(PostgresWalCdcPipelineReaderProperties.class, propertyBuilder); + testCommonRequiredProperties(PollingPipelineReaderProperties.class, propertyBuilder); - assertNoException(propertyBuilder.toString(), PostgresWalCdcPipelineReaderProperties.class); + assertNoException(propertyBuilder.toString(), PollingPipelineReaderProperties.class); PollingPipelineReaderProperties pollingPipelineReaderProperties = objectMapper.readValue(propertyBuilder.toString(), PollingPipelineReaderProperties.class); diff --git a/eventuate-local-java-cdc-connector-unified/src/test/resources/sample-pipeline-config.properties b/eventuate-local-java-cdc-connector-unified/src/test/resources/sample-pipeline-config.properties new file mode 100644 index 00000000..27b09cec --- /dev/null +++ b/eventuate-local-java-cdc-connector-unified/src/test/resources/sample-pipeline-config.properties @@ -0,0 +1,14 @@ +eventuate.cdc.reader.mysqlreader.type: polling +eventuate.cdc.reader.mysqlreader.datasourceurl: jdbc:mysql://mysql:3306/eventuate +eventuate.cdc.reader.mysqlreader.datasourceusername: mysqluser +eventuate.cdc.reader.mysqlreader.datasourcepassword: mysqlpw +eventuate.cdc.reader.mysqlreader.datasourcedriverclassname: com.mysql.cj.jdbc.driver +eventuate.cdc.reader.mysqlreader.leadershiplockpath: /eventuatelocal/cdc/leader/pipeline/1 +eventuate.cdc.reader.mysqlreader.outboxid: 1 +eventuate.cdc.reader.mysqlreader.pollingparallelchannelnames: parallel_channel_1,parallel_channel_2 + +eventuate.cdc.pipeline.p1.type: eventuate-local +eventuate.cdc.pipeline.p1.reader: mysqlreader + +eventuate.cdc.pipeline.p4.type: eventuate-tram +eventuate.cdc.pipeline.p4.reader: mysqlreader diff --git a/eventuate-tram-cdc-connector/src/main/java/io/eventuate/tram/cdc/connector/BinlogEntryToMessageConverter.java b/eventuate-tram-cdc-connector/src/main/java/io/eventuate/tram/cdc/connector/BinlogEntryToMessageConverter.java index 5b07cbfe..654df9bc 100644 --- a/eventuate-tram-cdc-connector/src/main/java/io/eventuate/tram/cdc/connector/BinlogEntryToMessageConverter.java +++ b/eventuate-tram-cdc-connector/src/main/java/io/eventuate/tram/cdc/connector/BinlogEntryToMessageConverter.java @@ -44,4 +44,9 @@ public Optional convert(BinlogEntry binlogEntry) { return Optional.of(message); } + + @Override + public String getDestinationColumn() { + return "destination"; + } } diff --git a/scripts/mysql-cli8.sh b/scripts/mysql-cli8.sh new file mode 100755 index 00000000..ddbbb600 --- /dev/null +++ b/scripts/mysql-cli8.sh @@ -0,0 +1,10 @@ +#! /bin/bash -e + +if [ -z "$MYSQL_PORT" ]; then + export MYSQL_PORT=3307 +fi + +docker run $* \ + --name mysqlterm --network=${PWD##*/}_default --rm \ + mysql/mysql-server:8.0.27-1.2.6-server \ + sh -c 'exec mysql -hmysql -P3306 -uroot -prootpassword -o eventuate'