Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3 V2: copy over legacy DATs, enable tests #48490

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sealed interface AirbyteValue {
null -> NullValue
is String -> StringValue(value)
is Boolean -> BooleanValue(value)
is Int -> IntegerValue(value.toLong())
is Int -> IntValue(value)
is Long -> IntegerValue(value)
is BigInteger -> IntegerValue(value)
is Double -> NumberValue(BigDecimal.valueOf(value))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import java.text.Normalizer
import java.util.regex.Pattern

class Transformations {
companion object {
private const val S3_SAFE_CHARACTERS = "\\p{Alnum}/!_.*')("
private const val S3_SPECIAL_CHARACTERS = "&$@=;:+,?-"
private val S3_CHARACTER_PATTERN =
"[^${S3_SAFE_CHARACTERS}${Pattern.quote(S3_SPECIAL_CHARACTERS)}]"

fun toS3SafeCharacters(input: String): String {
return Normalizer.normalize(input, Normalizer.Form.NFKD)
.replace(
"\\p{M}".toRegex(),
"",
) // P{M} matches a code point that is not a combining mark (unicode)
.replace(S3_CHARACTER_PATTERN.toRegex(), "_")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ class DefaultStreamManager(
private val rangesState: ConcurrentHashMap<Batch.State, RangeSet<Long>> = ConcurrentHashMap()

init {
Batch.State.entries.forEach { rangesState[it] = TreeRangeSet.create() }
Batch.State.entries.forEach {
rangesState[it] = TreeRangeSet.create(listOf(Range.closedOpen(0L, 1L)))
}
}

override fun countRecordIn(): Long {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.task

import com.google.common.collect.Range
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
Expand All @@ -15,6 +16,7 @@ import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.load.message.DestinationRecordWrapped
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory
Expand Down Expand Up @@ -198,9 +200,17 @@ class DefaultDestinationTaskLauncher(
stream: DestinationStream.Descriptor,
file: SpilledRawMessagesLocalFile
) {
log.info { "Starting process records task for $stream, file $file" }
val task = processRecordsTaskFactory.make(this, stream, file)
enqueue(task)
if (file.totalSizeBytes > 0L) {
log.info { "Starting process records task for ${stream}, file $file" }
val task = processRecordsTaskFactory.make(this, stream, file)
enqueue(task)
} else {
log.info { "No records to process in $file, skipping process records" }
handleNewBatch(
stream,
BatchEnvelope(SimpleBatch(Batch.State.COMPLETE), Range.openClosed(0L, 0L))
)
}
if (!file.endOfStream) {
log.info { "End-of-stream not reached, restarting spill-to-disk task for $stream" }
val spillTask = spillToDiskTaskFactory.make(this, stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,11 @@ class StreamManagerTest {
// Can close now
Assertions.assertDoesNotThrow(manager::markSucceeded)
}

@Test
fun testEmptyCompletedStreamYieldsBatchProcessingComplete() {
val manager = DefaultStreamManager(stream1)
manager.markEndOfStream()
Assertions.assertTrue(manager.isBatchProcessingComplete())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,21 @@ class DestinationTaskLauncherTest<T> where T : LeveledTask, T : ScopedTask {
)
}

@Test
fun testHandleEmptySpilledFile() = runTest {
taskLauncher.handleNewSpilledFile(
MockDestinationCatalogFactory.stream1.descriptor,
SpilledRawMessagesLocalFile(Path("not/a/real/file"), 0L, Range.singleton(0))
)

mockSpillToDiskTaskFactory.streamHasRun[MockDestinationCatalogFactory.stream1.descriptor]
?.receive()
?: Assertions.fail("SpillToDiskTask not run")

delay(500)
Assertions.assertTrue(processRecordsTaskFactory.hasRun.tryReceive().isFailure)
}

@Test
fun testHandleSpilledFileCompleteNotEndOfStream() = runTest {
taskLauncher.handleNewSpilledFile(
Expand Down Expand Up @@ -488,6 +503,18 @@ class DestinationTaskLauncherTest<T> where T : LeveledTask, T : ScopedTask {
Assertions.assertTrue(true)
}

@Test
fun handleEmptyBatch() = runTest {
val range = TreeRangeSet.create(listOf(Range.closed(0L, 0L)))
val streamManager =
syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
streamManager.markEndOfStream()

val emptyBatch = BatchEnvelope(MockBatch(Batch.State.COMPLETE), range)
taskLauncher.handleNewBatch(MockDestinationCatalogFactory.stream1.descriptor, emptyBatch)
closeStreamTaskFactory.hasRun.receive()
}

@Test
fun testHandleStreamClosed() = runTest {
// This should run teardown unconditionally.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,7 @@ abstract class BasicFunctionalityIntegrationTest(
nestedFloat = BigDecimal("50000.0000000000000001")
topLevelFloat = BigDecimal("50000.0000000000000001")
bigInt = BigInteger("99999999999999999999999999999999")

bigIntChanges = emptyList<Change>()
badValuesData =
mapOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

package io.airbyte.cdk.load.file.avro

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.UnionType
import java.io.Closeable
import java.io.InputStream
import kotlin.io.path.outputStream
Expand Down Expand Up @@ -34,15 +44,46 @@ class AvroReader(
}
}

fun InputStream.toAvroReader(avroSchema: Schema): AvroReader {
val reader = GenericDatumReader<GenericRecord>(avroSchema)
fun InputStream.toAvroReader(descriptor: DestinationStream.Descriptor): AvroReader {
val reader = GenericDatumReader<GenericRecord>()
val tmpFile =
kotlin.io.path.createTempFile(
prefix = "${avroSchema.namespace}.${avroSchema.name}",
prefix = "${descriptor.namespace}.${descriptor.name}",
suffix = ".avro"
)
tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) }
val file = tmpFile.toFile()
val dataFileReader = DataFileReader(file, reader)
return AvroReader(dataFileReader, file)
}

fun toAirbyteType(schema: Schema): AirbyteType {
return when (schema.type) {
Schema.Type.STRING -> StringType
Schema.Type.INT,
Schema.Type.LONG -> IntegerType
Schema.Type.FLOAT,
Schema.Type.DOUBLE -> NumberType
Schema.Type.BOOLEAN -> BooleanType
Schema.Type.RECORD ->
ObjectType(
schema.fields.associateTo(linkedMapOf()) {
it.name() to FieldType(toAirbyteType(it.schema()), nullable = true)
}
)
Schema.Type.ARRAY ->
ArrayType(FieldType(toAirbyteType(schema.elementType), nullable = true))
Schema.Type.UNION ->
UnionType(
schema.types
.filter { it.type != Schema.Type.NULL }
.map { toAirbyteType(it) }
.toSet()
)
Schema.Type.NULL ->
throw IllegalStateException(
"Null should only appear in union types, and should have been handled in an earlier recursion. This is a bug."
)
else -> throw IllegalArgumentException("Unsupported Avro schema $schema")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,12 @@ sealed class ObjectStorageFormatSpecification(
}

interface FlatteningSpecificationProvider {
@get:JsonSchemaTitle("Flattening") @get:JsonProperty("flattening") val flattening: Flattening?
@get:JsonSchemaTitle("Flattening")
@get:JsonProperty("flattening", defaultValue = "No flattening")
val flattening: Flattening?

enum class Flattening(@get:JsonValue val flatteningName: String) {
NO_FLATTENING("No Flattening"),
NO_FLATTENING("No flattening"),
ROOT_LEVEL_FLATTENING("Root level flattening")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ class JsonFormattingWriter(
private val rootLevelFlattening: Boolean,
) : ObjectStorageFormattingWriter {
override fun accept(record: DestinationRecord) {
outputStream.write(
val data =
record.dataWithAirbyteMeta(stream, rootLevelFlattening).toJson().serializeToString()
)
println("data: $data")
outputStream.write(data)
outputStream.write("\n")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider
import io.airbyte.cdk.load.data.Transformations
import io.airbyte.cdk.load.file.DefaultTimeProvider
import io.airbyte.cdk.load.file.TimeProvider
import io.micronaut.context.annotation.Secondary
Expand All @@ -21,8 +22,8 @@ import java.time.format.DateTimeFormatter
import java.util.*

interface PathFactory {
fun getStagingDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path
fun getFinalDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path
fun getStagingDirectory(stream: DestinationStream, streamConstantPrefix: Boolean = false): Path
fun getFinalDirectory(stream: DestinationStream, streamConstantPrefix: Boolean = false): Path
fun getPathToFile(
stream: DestinationStream,
partNumber: Long?,
Expand Down Expand Up @@ -155,8 +156,12 @@ class ObjectStoragePathFactory(
const val DEFAULT_FILE_FORMAT = "{part_number}{format_extension}"
val PATH_VARIABLES =
listOf(
PathVariable("NAMESPACE") { it.stream.descriptor.namespace ?: "" },
PathVariable("STREAM_NAME") { it.stream.descriptor.name },
PathVariable("NAMESPACE") {
Transformations.toS3SafeCharacters(it.stream.descriptor.namespace ?: "")
},
PathVariable("STREAM_NAME") {
Transformations.toS3SafeCharacters(it.stream.descriptor.name)
},
PathVariable("YEAR", """\d{4}""") {
ZonedDateTime.ofInstant(it.time, ZoneId.of("UTC")).year.toString()
},
Expand Down Expand Up @@ -193,7 +198,8 @@ class ObjectStoragePathFactory(
PathVariable("EPOCH", """\d+""") { it.time.toEpochMilli().toString() },
PathVariable("UUID", """[a-fA-F0-9\\-]{36}""") { UUID.randomUUID().toString() }
)
val PATH_VARIABLES_STREAM_CONSTANT = PATH_VARIABLES.filter { it.variable != "UUID" }
val PATH_VARIABLES_STREAM_CONSTANT =
PATH_VARIABLES.filter { it.variable == "STREAM_NAME" || it.variable == "NAMESPACE" }
val FILENAME_VARIABLES =
listOf(
FileVariable("date", """\d{4}_\d{2}_\d{2}""") { DATE_FORMATTER.format(it.time) },
Expand All @@ -219,21 +225,26 @@ class ObjectStoragePathFactory(
}
}

override fun getStagingDirectory(stream: DestinationStream, streamConstant: Boolean): Path {
override fun getStagingDirectory(
stream: DestinationStream,
streamConstantPrefix: Boolean
): Path {
val path =
getFormattedPath(
stream,
if (streamConstant) PATH_VARIABLES_STREAM_CONSTANT else PATH_VARIABLES
)
stream,
if (streamConstantPrefix) PATH_VARIABLES_STREAM_CONSTANT else PATH_VARIABLES
)
.takeWhile { it != '$' }
return Paths.get(stagingPrefix, path)
}

override fun getFinalDirectory(stream: DestinationStream, streamConstant: Boolean): Path {
override fun getFinalDirectory(stream: DestinationStream, streamConstantPrefix: Boolean): Path {
val path =
getFormattedPath(
stream,
if (streamConstant) PATH_VARIABLES_STREAM_CONSTANT else PATH_VARIABLES
)
stream,
if (streamConstantPrefix) PATH_VARIABLES_STREAM_CONSTANT else PATH_VARIABLES
)
.takeWhile { it != '$' }
return Paths.get(prefix, path)
}

Expand Down Expand Up @@ -292,11 +303,13 @@ class ObjectStoragePathFactory(
return Regex.escapeReplacement(input).replace(macroPattern.toRegex()) {
val variable = it.groupValues[1]
val pattern = variableToPattern[variable]
if (pattern != null) {
if (pattern == null) {
variable
} else if (pattern.isBlank()) {
""
} else {
variableToIndex[variable] = variableToIndex.size + 1
"($pattern)"
} else {
variable
}
}
}
Expand All @@ -319,7 +332,7 @@ class ObjectStoragePathFactory(
pathVariableToPattern,
variableToIndex
)
val combined = Path.of(prefix).resolve(replacedForPath).resolve(replacedForFile).toString()
val combined = Path.of("$prefix/$replacedForPath").resolve(replacedForFile).toString()

return PathMatcher(Regex(combined), variableToIndex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,20 @@ class ObjectStorageFallbackPersister(
private val client: ObjectStorageClient<*>,
private val pathFactory: PathFactory
) : DestinationStatePersister<ObjectStorageDestinationState> {
private val log = KotlinLogging.logger {}
override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState {
val matcher = pathFactory.getPathMatcher(stream)
val pathConstant = pathFactory.getFinalDirectory(stream, streamConstant = true).toString()
val firstVariableIndex = pathConstant.indexOfFirst { it == '$' }
val longestUnambiguous =
if (firstVariableIndex > 0) {
pathConstant.substring(0, firstVariableIndex)
} else {
pathConstant
}
client
.list(longestUnambiguous)
.mapNotNull { matcher.match(it.key) }
.toList()
pathFactory.getFinalDirectory(stream, streamConstantPrefix = true).toString()
log.info { "Inferring destination state from data under $longestUnambiguous" }
// TODO: Re-flow this before release
val keys = client.list(longestUnambiguous).toList()
log.info { "Found ${keys.size} keys under $longestUnambiguous" }
val matches = keys.mapNotNull { matcher.match(it.key) }.toList()
log.info {
"Matched ${matches.map { it.path to it.partNumber } } matching keys under against ${matcher.regex}"
}
matches
.groupBy {
client
.getMetadata(it.path)[ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY]
Expand All @@ -163,6 +163,10 @@ class ObjectStorageFallbackPersister(
}
.toMutableMap()
.let {
val generationSizes = it.map { gen -> gen.key to gen.value.size }
log.info {
"Inferred state for generations with size: $generationSizes (minimum=${stream.minimumGenerationId}; current=${stream.generationId}"
}
return ObjectStorageDestinationState(
mutableMapOf(ObjectStorageDestinationState.State.FINALIZED to it)
)
Expand Down
Loading
Loading