Skip to content

Commit

Permalink
Bulk Load CDK: S3V2: CSV Printer config matches old cdk; await state … (
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Nov 18, 2024
1 parent d705fe9 commit 8901591
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.file

import java.io.BufferedOutputStream
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.util.zip.GZIPOutputStream
Expand All @@ -20,8 +21,10 @@ data object NoopProcessor : StreamProcessor<ByteArrayOutputStream> {
override val extension: String? = null
}

data object GZIPProcessor : StreamProcessor<GZIPOutputStream> {
override val wrapper: (ByteArrayOutputStream) -> GZIPOutputStream = { GZIPOutputStream(it) }
override val partFinisher: GZIPOutputStream.() -> Unit = { finish() }
data object GZIPProcessor : StreamProcessor<BufferedOutputStream> {
override val wrapper: (ByteArrayOutputStream) -> BufferedOutputStream = {
BufferedOutputStream(GZIPOutputStream(it))
}
override val partFinisher: BufferedOutputStream.() -> Unit = { close() }
override val extension: String = "gz"
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ class DefaultTeardownTask(

override suspend fun execute() {
syncManager.awaitInputProcessingComplete()
checkpointManager.awaitAllCheckpointsFlushed()

log.info { "Teardown task awaiting stream completion" }
if (!syncManager.awaitAllStreamsCompletedSuccessfully()) {
log.info { "Streams failed to complete successfully, doing nothing." }
return
}

checkpointManager.awaitAllCheckpointsFlushed()
log.info { "Starting teardown task" }
destination.teardown()
log.info { "Teardown task complete, marking sync succeeded." }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ package io.airbyte.cdk.load.file.csv
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.csv.toCsvHeader
import java.io.OutputStream
import java.io.PrintWriter
import java.nio.charset.StandardCharsets
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter
import org.apache.commons.csv.QuoteMode

fun ObjectType.toCsvPrinterWithHeader(outputStream: OutputStream): CSVPrinter =
CSVFormat.Builder.create()
.setHeader(*toCsvHeader())
.setAutoFlush(true)
.build()
.print(outputStream.writer(charset = Charsets.UTF_8))
@Suppress("DEPRECATION")
fun ObjectType.toCsvPrinterWithHeader(outputStream: OutputStream): CSVPrinter {
val csvSettings =
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.NON_NUMERIC).withHeader(*toCsvHeader())
return CSVPrinter(PrintWriter(outputStream, true, StandardCharsets.UTF_8), csvSettings)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Singleton
import java.io.BufferedOutputStream
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.zip.GZIPOutputStream
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -84,9 +84,9 @@ class ObjectStoragePathFactoryTest {
@Primary
@Requires(env = ["ObjectStoragePathFactoryTest"])
class MockCompressionConfigProvider :
ObjectStorageCompressionConfigurationProvider<GZIPOutputStream> {
ObjectStorageCompressionConfigurationProvider<BufferedOutputStream> {
override val objectStorageCompressionConfiguration:
ObjectStorageCompressionConfiguration<GZIPOutputStream> =
ObjectStorageCompressionConfiguration<BufferedOutputStream> =
ObjectStorageCompressionConfiguration(compressor = GZIPProcessor)
}

Expand Down

0 comments on commit 8901591

Please sign in to comment.