Skip to content

Commit

Permalink
Temporal per stream resets (#13990)
Browse files Browse the repository at this point in the history
* remove reset flags from workflow state + refactor

* bring back cancelledForReset, since we need to distinguish between that case and a normal cancel

* delete reset job streams on cancel or success

* extract isResetJob to method

* merge with master

* set sync modes on streams in reset job correctly

* format

* Add test for getAllStreamsForConnection

* fix tests

* update more tests

* add StreamResetActivityTests

* fix tests for default job creator

* remove outdated comment

* remove debug lines

* remove unused enum value

* fix tests

* fix constant equals ordering

* make job mock not static

* DRY and add comments

* add comment about deleted streams

* Remove io.airbyte.config.StreamDescriptor

* regisster stream reset activity impl

* refetch connection workflow when checking job id, since it may have been restarted

* only cancel if workflow is running, to allow reset signal to always succeed even if batched with a workflow start

* fix reset signal to use new doneWaiting workflow state prop

* try to fix tests

* fix reset cancel case

* add acceptance test for resetting while sync is running

* format

* fix new acceptance test

* lower sleep on test

* raise sleep

* increase sleep and timeout, and remove repeated test

* use CatalogHelpers to extract stream descriptors

* raise sleep and timeout to prevent transient failures

* format

Co-authored-by: alovew <anne@airbyte.io>
  • Loading branch information
lmossman and alovew authored Jun 28, 2022
1 parent 34ed33b commit 924bab4
Show file tree
Hide file tree
Showing 38 changed files with 650 additions and 495 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ properties:
streamsToReset:
type: array
items:
"$ref": StreamDescriptor.yaml
type: object
existingJavaType: io.airbyte.protocol.models.StreamDescriptor

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.time.OffsetDateTime;
Expand Down Expand Up @@ -981,4 +983,10 @@ public void writeWorkspaceServiceAccountNoSecrets(final WorkspaceServiceAccount
workspaceServiceAccount);
}

public List<StreamDescriptor> getAllStreamsForConnection(final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync standardSync = getStandardSync(connectionId);
return CatalogHelpers.extractStreamDescriptors(standardSync.getCatalog());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import static org.jooq.impl.DSL.noCondition;

import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.StreamResetRecord;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.protocol.models.StreamDescriptor;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.List;
Expand Down Expand Up @@ -38,7 +38,7 @@ public StreamResetPersistence(final Database database) {
}

/*
* Get a list of streamDescriptors for streams that have pending or running resets
* Get a list of StreamDescriptors for streams that have pending or running resets
*/
public List<StreamDescriptor> getStreamResets(final UUID connectionId) throws IOException {
return database.query(ctx -> ctx.select(DSL.asterisk())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.db.Database;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.sql.SQLException;
Expand Down Expand Up @@ -431,4 +435,31 @@ void testHealthCheckFailure() throws SQLException {
assertFalse(check);
}

@Test
void testGetAllStreamsForConnection() throws Exception {
final UUID connectionId = UUID.randomUUID();
final AirbyteStream airbyteStream = new AirbyteStream().withName("stream1").withNamespace("namespace1");
final ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream().withStream(airbyteStream);
final AirbyteStream airbyteStream2 = new AirbyteStream().withName("stream2");
final ConfiguredAirbyteStream configuredStream2 = new ConfiguredAirbyteStream().withStream(airbyteStream2);
final ConfiguredAirbyteCatalog configuredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStream, configuredStream2));

final StandardSync sync = new StandardSync()
.withCatalog(configuredCatalog);
doReturn(sync)
.when(configRepository)
.getStandardSync(connectionId);

final List<StreamDescriptor> result = configRepository.getAllStreamsForConnection(connectionId);
assertEquals(2, result.size());

assertTrue(
result.stream().anyMatch(
streamDescriptor -> streamDescriptor.getName().equals("stream1") && streamDescriptor.getNamespace().equals("namespace1")));
assertTrue(
result.stream().anyMatch(
streamDescriptor -> streamDescriptor.getName().equals("stream2") && streamDescriptor.getNamespace() == null));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;

import io.airbyte.config.StreamDescriptor;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.test.utils.DatabaseConnectionHelper;
import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package io.airbyte.scheduler.client;

import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.util.List;
import java.util.Set;
import java.util.UUID;

Expand All @@ -16,9 +18,9 @@ public interface EventRunner {

ManualOperationResult startNewCancellation(final UUID connectionId);

ManualOperationResult resetConnection(final UUID connectionId);
ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset);

ManualOperationResult synchronousResetConnection(final UUID connectionId);
ManualOperationResult synchronousResetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset);

void deleteConnection(final UUID connectionId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

package io.airbyte.scheduler.client;

import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import lombok.AllArgsConstructor;
Expand All @@ -27,12 +29,12 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
return temporalClient.startNewCancellation(connectionId);
}

public ManualOperationResult resetConnection(final UUID connectionId) {
return temporalClient.resetConnection(connectionId);
public ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
return temporalClient.resetConnection(connectionId, streamsToReset);
}

public ManualOperationResult synchronousResetConnection(final UUID connectionId) {
return temporalClient.synchronousResetConnection(connectionId);
public ManualOperationResult synchronousResetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
return temporalClient.synchronousResetConnection(connectionId, streamsToReset);
}

public void deleteConnection(final UUID connectionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.State;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -87,13 +88,6 @@ public Optional<Long> createSyncJob(final SourceConnection source,
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig);
}

// Strategy:
// 1. Set all streams to full refresh - overwrite.
// 2. Create a job where the source emits no records.
// 3. Run a sync from the empty source to the destination. This will overwrite all data for each
// stream in the destination.
// 4. The Empty source emits no state message, so state will start at null (i.e. start from the
// beginning on the next sync).
@Override
public Optional<Long> createResetConnectionJob(final DestinationConnection destination,
final StandardSync standardSync,
Expand All @@ -103,8 +97,19 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti
throws IOException {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = standardSync.getCatalog();
configuredAirbyteCatalog.getStreams().forEach(configuredAirbyteStream -> {
final StreamDescriptor streamDescriptor = CatalogHelpers.extractDescriptor(configuredAirbyteStream);
configuredAirbyteStream.setSyncMode(SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
if (streamsToReset.contains(streamDescriptor)) {
// The Reset Source will emit no record messages for any streams, so setting the destination sync
// mode to OVERWRITE will empty out this stream in the destination.
// Note: streams in streamsToReset that are NOT in this configured catalog (i.e. deleted streams)
// will still have their state reset by the Reset Source, but will not be modified in the
// destination since they are not present in the catalog that is sent to the destination.
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
} else {
// Set streams that are not being reset to APPEND so that they are not modified in the destination
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND);
}
});
final JobResetConnectionConfig resetConnectionConfig = new JobResetConnectionConfig()
.withNamespaceDefinition(standardSync.getNamespaceDefinition())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.protocol.models.StreamDescriptor;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.State;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.protocol.models.CatalogHelpers;
Expand All @@ -41,8 +40,9 @@
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -52,8 +52,13 @@

public class DefaultJobCreatorTest {

private static final String STREAM_NAME = "users";
private static final String STREAM1_NAME = "stream1";
private static final String STREAM2_NAME = "stream2";
private static final String STREAM3_NAME = "stream3";
private static final String NAMESPACE = "namespace";
private static final String FIELD_NAME = "id";
private static final StreamDescriptor STREAM1_DESCRIPTOR = new StreamDescriptor().withName(STREAM1_NAME);
private static final StreamDescriptor STREAM2_DESCRIPTOR = new StreamDescriptor().withName(STREAM2_NAME).withNamespace(NAMESPACE);

private static final String SOURCE_IMAGE_NAME = "daxtarity/sourceimagename";
private static final String DESTINATION_IMAGE_NAME = "daxtarity/destinationimagename";
Expand All @@ -62,8 +67,6 @@ public class DefaultJobCreatorTest {
private static final StandardSync STANDARD_SYNC;
private static final StandardSyncOperation STANDARD_SYNC_OPERATION;
private static final long JOB_ID = 12L;
private static final StreamDescriptor STREAM_DESCRIPTOR1 = new StreamDescriptor().withName("stream 1").withNamespace("namespace 1");
private static final StreamDescriptor STREAM_DESCRIPTOR2 = new StreamDescriptor().withName("stream 2").withNamespace("namespace 2");

private JobPersistence jobPersistence;
private StatePersistence statePersistence;
Expand Down Expand Up @@ -97,13 +100,17 @@ public class DefaultJobCreatorTest {
.withConfiguration(implementationJson)
.withTombstone(false);

final ConfiguredAirbyteStream stream = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)));
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(stream));

final UUID connectionId = UUID.randomUUID();
final UUID operationId = UUID.randomUUID();

final ConfiguredAirbyteStream stream1 = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)));
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)));
final ConfiguredAirbyteStream stream3 = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM3_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)));
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream1, stream2, stream3));

STANDARD_SYNC = new StandardSync()
.withConnectionId(connectionId)
.withName("presto to hudi")
Expand Down Expand Up @@ -329,12 +336,21 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {

@Test
void testCreateResetConnectionJob() throws IOException {
final ConfiguredAirbyteCatalog expectedCatalog = STANDARD_SYNC.getCatalog();
expectedCatalog.getStreams()
.forEach(configuredAirbyteStream -> {
configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});
final List<StreamDescriptor> streamsToReset = List.of(STREAM1_DESCRIPTOR, STREAM2_DESCRIPTOR);
final ConfiguredAirbyteCatalog expectedCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE),
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE),
// this stream is not being reset, so it should have APPEND destination sync mode
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM3_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.APPEND)));

final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val")));
when(statePersistence.getCurrentState(STANDARD_SYNC.getConnectionId()))
Expand All @@ -349,7 +365,7 @@ void testCreateResetConnectionJob() throws IOException {
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)))
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset))
.withState(connectionState);

final JobConfig jobConfig = new JobConfig()
Expand All @@ -364,7 +380,7 @@ void testCreateResetConnectionJob() throws IOException {
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2));
streamsToReset);

verify(jobPersistence).enqueueJob(expectedScope, jobConfig);
assertTrue(jobId.isPresent());
Expand All @@ -373,12 +389,21 @@ void testCreateResetConnectionJob() throws IOException {

@Test
void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
final ConfiguredAirbyteCatalog expectedCatalog = STANDARD_SYNC.getCatalog();
expectedCatalog.getStreams()
.forEach(configuredAirbyteStream -> {
configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});
final List<StreamDescriptor> streamsToReset = List.of(STREAM1_DESCRIPTOR, STREAM2_DESCRIPTOR);
final ConfiguredAirbyteCatalog expectedCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE),
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE),
// this stream is not being reset, so it should have APPEND destination sync mode
new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(STREAM3_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)))
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.APPEND)));

final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val")));
when(statePersistence.getCurrentState(STANDARD_SYNC.getConnectionId()))
Expand All @@ -393,7 +418,7 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)))
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset))
.withState(connectionState);

final JobConfig jobConfig = new JobConfig()
Expand All @@ -408,7 +433,7 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2));
streamsToReset);

verify(jobPersistence).enqueueJob(expectedScope, jobConfig);
assertTrue(jobId.isEmpty());
Expand Down
Loading

0 comments on commit 924bab4

Please sign in to comment.