Skip to content

Commit

Permalink
#128 Enhance PollingDao to use a thread-per-channel
Browse files Browse the repository at this point in the history
Also, removed unused bean dependencies in order to simplify testing
  • Loading branch information
cer committed Aug 3, 2022
1 parent 9f24147 commit edf8e47
Show file tree
Hide file tree
Showing 40 changed files with 828 additions and 269 deletions.
16 changes: 15 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ dockerCompose {
removeContainers = project.ext.removeContainers
}

mysql8cdcpolling {
projectName = null
useComposeFiles = ["docker-compose-mysql8.yml", "docker-compose-cdc-mysql-polling.yml"]
removeContainers = project.ext.removeContainers
startedServices = ["eventuate-cdc-service"]
}

mysql8cdcunifiedpolling {
projectName = null
useComposeFiles = ["docker-compose-mysql8.yml", "docker-compose-cdc-unified-polling-mysql.yml"]
removeContainers = project.ext.removeContainers
startedServices = ["eventuate-cdc-service"]
}

mysqlonly {
projectName = null
useComposeFiles = ["docker-compose-mysql.yml"]
Expand All @@ -116,7 +130,7 @@ dockerCompose {
projectName = null
useComposeFiles = ["docker-compose-mysql8.yml"]
removeContainers = project.ext.removeContainers
startedServices = ["mysql8"]
startedServices = ["mysql"]
removeOrphans = false
}

Expand Down
30 changes: 30 additions & 0 deletions docker-compose-cdc-mysql-polling.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
version: '3'
services:
eventuate-cdc-service:
env_file:
- ${DOCKER_ENV_FILE:-docker-compose-env-files/empty.env}
build:
context: ./eventuate-cdc-service/
image: eventuateio/eventuate-cdc-service:${DOCKER_IMAGE_TAG:-latest}
ports:
- "8099:8080"
links:
- mysql
- kafka
- zookeeper
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql/eventuate
SPRING_DATASOURCE_USERNAME: mysqluser
SPRING_DATASOURCE_PASSWORD: mysqlpw
SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.mysql.cj.jdbc.Driver
EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:29092
EVENTUATELOCAL_CDC_DB_USER_NAME: root
EVENTUATELOCAL_CDC_DB_PASSWORD: rootpassword
EVENTUATELOCAL_CDC_READER_NAME: MySqlReader
EVENTUATE_OUTBOX_ID: 1
EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_UNIQUE_ID: 1234567890
EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC: "false"
EVENTUATE_CDC_TYPE: ${EVENTUATE_CDC_TYPE}
SPRING_PROFILES_ACTIVE: EventuatePolling
EVENTUATELOCAL_CDC_POLLING_PARALLEL_CHANNELS: parallel_channel_1
30 changes: 30 additions & 0 deletions docker-compose-cdc-unified-polling-mysql.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
version: '3'
services:
eventuate-cdc-service:
build: ./eventuate-cdc-service
ports:
- "8099:8080"
links:
- mysql
- kafka
- zookeeper
environment:
EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:29092
EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
SPRING_PROFILES_ACTIVE: ${SPRING_PROFILES_ACTIVE}

EVENTUATE_CDC_READER_MYSQLREADER_TYPE: polling
EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEURL: jdbc:mysql://mysql:3306/eventuate
EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEUSERNAME: mysqluser
EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEPASSWORD: mysqlpw
EVENTUATE_CDC_READER_MYSQLREADER_DATASOURCEDRIVERCLASSNAME: com.mysql.cj.jdbc.Driver
EVENTUATE_CDC_READER_MYSQLREADER_LEADERSHIPLOCKPATH: /eventuatelocal/cdc/leader/pipeline/1
EVENTUATE_CDC_READER_MYSQLREADER_OUTBOXID: 1
EVENTUATE_CDC_READER_MYSQLREADER_POLLINGPARALLELCHANNELNAMES: parallel_channel_1

EVENTUATE_CDC_PIPELINE_P1_TYPE: eventuate-local
EVENTUATE_CDC_PIPELINE_P1_READER: MYSQLREADER

EVENTUATE_CDC_PIPELINE_P4_TYPE: eventuate-tram
EVENTUATE_CDC_PIPELINE_P4_READER: MYSQLREADER

Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ public CompletableFuture<?> publish(BinlogEntry binlogEntry) {
.map(eventPublisher::apply)
.orElse(CompletableFuture.completedFuture(null));
}

public String getDestinationColumn() {
return binlogEntryToEventConverter.getDestinationColumn();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public abstract class BinlogEntryReader {
protected List<BinlogEntryHandler> binlogEntryHandlers = new CopyOnWriteArrayList<>();
protected AtomicBoolean running = new AtomicBoolean(false);
protected CountDownLatch stopCountDownLatch;
protected String dataSourceUrl;
protected DataSource dataSource;
protected String readerName;
protected Long outboxId;
Expand All @@ -34,13 +33,11 @@ public abstract class BinlogEntryReader {
protected Optional<Runnable> restartCallback = Optional.empty();

public BinlogEntryReader(MeterRegistry meterRegistry,
String dataSourceUrl,
DataSource dataSource,
String readerName,
Long outboxId) {

this.meterRegistry = meterRegistry;
this.dataSourceUrl = dataSourceUrl;
this.dataSource = dataSource;
this.readerName = readerName;
this.outboxId = outboxId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@

public interface BinlogEntryToEventConverter<EVENT> {
Optional<EVENT> convert(BinlogEntry binlogEntry);

String getDestinationColumn();
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public Optional<PublishedEvent> convert(BinlogEntry binlogEntry) {

return Optional.of(publishedEvent);
}

@Override
public String getDestinationColumn() {
return "entity_type";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import org.springframework.beans.factory.annotation.Value;

import java.util.concurrent.TimeUnit;

public class EventuateConfigurationProperties {

@Value("${eventuatelocal.cdc.db.user.name:#{null}}")
Expand Down Expand Up @@ -36,6 +34,10 @@ public class EventuateConfigurationProperties {
@Value("${eventuatelocal.cdc.polling.retry.interval.in.milleseconds:#{500}}")
private int pollingRetryIntervalInMilliseconds;


@Value("${eventuatelocal.cdc.polling.parallel.channels:}")
private String[] pollingParallelChannels;

@Value("${eventuatelocal.cdc.leadership.lock.path:#{\"/eventuatelocal/cdc/leader\"}}")
private String leadershipLockPath;

Expand Down Expand Up @@ -207,4 +209,8 @@ public boolean isEnableBatchProcessing() {
public int getMaxBatchSize() {
return maxBatchSize;
}

public String[] getPollingParallelChannels() {
return pollingParallelChannels;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import io.eventuate.common.jdbc.EventuateSchema;
import io.eventuate.common.jdbc.JdbcUrl;
import io.eventuate.common.jdbc.JdbcUrlParser;
import io.eventuate.local.common.*;
import io.eventuate.local.common.BinlogEntryReader;
import io.eventuate.local.common.CdcMonitoringDao;
import io.micrometer.core.instrument.MeterRegistry;

import javax.sql.DataSource;
Expand All @@ -16,9 +17,9 @@ public abstract class DbLogClient extends BinlogEntryReader {
protected String dbPassword;
protected String host;
protected int port;
protected String defaultDatabase;
protected DbLogMetrics dbLogMetrics;
private boolean checkEntriesForDuplicates;
protected String dataSourceUrl;
private boolean checkEntriesForDuplicates;
protected volatile boolean connected;
protected CdcMonitoringDao cdcMonitoringDao;

Expand All @@ -35,7 +36,6 @@ public DbLogClient(MeterRegistry meterRegistry,
Long outboxId) {

super(meterRegistry,
dataSourceUrl,
dataSource,
readerName,
outboxId);
Expand All @@ -57,7 +57,6 @@ public DbLogClient(MeterRegistry meterRegistry,
JdbcUrl jdbcUrl = JdbcUrlParser.parse(dataSourceUrl);
host = jdbcUrl.getHost();
port = jdbcUrl.getPort();
defaultDatabase = jdbcUrl.getDatabase();
}

public boolean isConnected() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.eventuate.local.polling;

import io.eventuate.local.polling.spec.PollingSpec;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.singletonList;

public class ParallelPollingChannels {

private Set<String> channels;

@Override
public String toString() {
return "ParallelPollingChannels{" +
"channels=" + channels +
'}';
}

public ParallelPollingChannels(Set<String> channels) {
this.channels = channels;
}

public static ParallelPollingChannels make(String[] channels) {
return new ParallelPollingChannels(new HashSet<>(Arrays.asList(channels)));
}

public int size() {
return channels.size();
}

public List<PollingSpec> makePollingSpecs() {
return channels.isEmpty() ? singletonList(PollingSpec.ALL) : Stream.concat(Stream.of(PollingSpec.excludingChannels(channels)), channels.stream().map(PollingSpec::forChannel)).collect(Collectors.toList());
}

}
Loading

0 comments on commit edf8e47

Please sign in to comment.