Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReactiveStreams API Adapter for S3 ByteStream #612

Closed
1 task
hantsy opened this issue May 23, 2022 · 13 comments · Fixed by smithy-lang/smithy-kotlin#947
Closed
1 task

ReactiveStreams API Adapter for S3 ByteStream #612

hantsy opened this issue May 23, 2022 · 13 comments · Fixed by smithy-lang/smithy-kotlin#947
Labels
feature-request A feature should be added or improved.

Comments

@hantsy
Copy link

hantsy commented May 23, 2022

Describe the feature

Currently, the S3Client upload and get support ByteStream, when using it in a Reactive application, it is not so easy to convert a reactive streams type to ByteStream.

Is your Feature Request related to a problem?

No

Proposed Solution

I would like use the ReactiveStreams compatible Publisher<ByteBuffer>, esp, in Spring WebFlux/Reactor, use the specific Flux and DataBuffer aka Flux<DataBuffer>, also consider RxJava3/SmallRye Munity

Describe alternative solutions or features you've considered

  • The existing ByteStream to implements Publisher
  • Kotlin Couroutines, support Flow as data type.

Acknowledge

  • I may be able to implement this feature request

AWS Kotlin SDK version used

0.15.2-beta

Platform (JVM/JS/Native)

JVM

Operating System and version

Windows 10

@hantsy hantsy added feature-request A feature should be added or improved. needs-triage This issue or PR still needs to be triaged. labels May 23, 2022
@aajtodd
Copy link
Collaborator

aajtodd commented May 24, 2022

@hantsy Thanks for the feature request.

NOTE: There is a rather large limitation from S3 though that requires a known Content-Length. This would require that you know the total size that will be produced by a Flow/Publisher at the time of making the request. This may or may not be an issue for you depending on your use case.

See aws/aws-sdk-net#1095 for relevant discussion around this limitation.


Indeed today you would need to create your own adapter from Flow/Publisher -> ByteStream. I've produced a working sample using Flow<ByteArray> you can hopefully extrapolate from below that does this (assuming the limitation called out above is workable for your use case).

The key to marrying the two together is to collect the producer in a coroutine and write the individual buffers to an SdkByteChannel and use that channel as a ByteStream.OneShotStream.

/**
 * Dummy producer
 */
fun producer(content: ByteArray, cnt: Int): Flow<ByteArray> = flow {
    repeat(cnt) {
        println("emitting buffer $it")
        emit(content)
        delay(1.seconds)
    }
}


/**
 * Collect the [producer] flow and emit it as a [ByteStream.OneShotStream]
 *
 * @param producer the publisher
 * @param contentLength the total content length that producer will emit
 * @return a [ByteStream] that can be used for S3 operations
 */
fun CoroutineScope.byteStreamFlow(
    producer: Flow<ByteArray>,
    contentLength: Long
): ByteStream {
    val chan = SdkByteChannel()
    launch {
        producer
            .onCompletion { cause -> chan.close(cause)    }
            .collect {
                println("writing buffer")
                chan.writeFully(it)
            }
    }

    return object : ByteStream.OneShotStream() {
        override val contentLength: Long? = contentLength
        override fun readFrom(): SdkByteReadChannel = chan
    }
}

fun main(): Unit = runBlocking {
    val bucketName = "<YOUR-BUCKET-NAME>"
    val keyName = "<YOUR-KEY-NAME>"

    val dummyContent = "Reactive ByteStream example".encodeToByteArray()
    val cnt = 10
    val contentLength = (dummyContent.size * cnt).toLong()

    S3Client.fromEnvironment{
        sdkLogMode = SdkLogMode.LogRequest + SdkLogMode.LogResponse
    }.use { s3 ->

        // S3 requires Content-Length header for PutObject or UploadPart
        val bodyStream = byteStreamFlow(producer(dummyContent, cnt), contentLength)

        val resp = s3.putObject {
            bucket = bucketName
            key = keyName
            body = bodyStream
        }

        println(resp)
    }
}

@aajtodd aajtodd removed the needs-triage This issue or PR still needs to be triaged. label May 24, 2022
@aajtodd aajtodd changed the title ReactiveStreams API Adapter for data type in S3Client ReactiveStreams API Adapter for S3 ByteStream May 24, 2022
@hantsy
Copy link
Author

hantsy commented May 26, 2022

Currently, I am using Spring WebFlux multipart feature to upload a file to S3, the uploaded file FilePart include a content method (return a Flux<DataBuffer>) to retrieve the file content.

I convert Flux<DataBuffer> into ByteArray, and use S3Client ByteStream.readBytes to read it it. On my testing environment (LocalStack docker and a text file), it works well.

suspend fun mergeDataBuffers(dataBufferFlux: Flux<DataBuffer>): ByteArray {
        return DataBufferUtils.join(dataBufferFlux) // merge all databuffer(`Flux`) to a single dataBuffer(`Mono`).
            .map { dataBuffer ->
                val bytes = ByteArray(dataBuffer.readableByteCount())
                dataBuffer.read(bytes)
                DataBufferUtils.release(dataBuffer)
                bytes
            }
            .awaitSingle()
    }

The bad is it will read all bytes in memory. Ideally, use a Flow to emit data(bytearray, bytebuffer) to S3 dynamically.

I will try your solution to use Flow in the whole progress, thanks.

@hantsy
Copy link
Author

hantsy commented Jun 15, 2022

Currently using my solution, text file works, the image(binary file) download failed.

@hantsy
Copy link
Author

hantsy commented Jun 15, 2022

Unlike Spring traditional MultipartFile from which we can read the file size directly. The new Spring reactive Part/FilePart(Upload files) does not include a method/field to get the content length.

@ianbotsf
Copy link
Contributor

@hantsy I'm sorry to hear that downloading failed. Can you please provide your complete download code and the exception & stack trace you encountered?

@hantsy
Copy link
Author

hantsy commented Jun 16, 2022

Currently I am using the following codes to get files from s3 service and write to http response in a Spring Webflux/Kotlin Coroutines appalication.

val request = GetObjectRequest {
    bucket = bucketName
    key = objectKey
}

// read the object into byte array.
s3client.getObject(request) { it.body?.toByteArray() 

...

// in spring controller.
val buffer = DataBufferUtils.read(
    ByteArrayResource(bytes), // the byte array retrieved from S3
    DefaultDataBufferFactory(),
    1024
).log()

// write to HTTP response
return exchange.response.writeWith(buffer).awaitSingleOrNull()

@ianbotsf
Copy link
Contributor

I assume you're storing the result of getting the byte array into a value called bytes?

val bytes = s3client.getObject(request) { it.body?.toByteArray() }

If so, that looks like the correct way to read the entire object into memory. What exception or problem are you seeing?

@hantsy
Copy link
Author

hantsy commented Jun 17, 2022

This works with a LocalStack Docker and I tested it with a text file. But on the AWS, it is deployed by the CI and Devops, I have no enough permission to view all aws logs.

For my experience, maybe I have to set content type to my HttpResponse, I will verify it.

@hantsy
Copy link
Author

hantsy commented Jun 17, 2022

I know reading the object into a byte array is a bad idea. But there is no simple way to convert between Flux<DataBuffer> and S3 ByteStream.

@hantsy
Copy link
Author

hantsy commented Jun 17, 2022

Aws SDK for Java includes some async APIs(based on Java 8 CompletableFuture), I hope this Aws SDK for Kotlin will include built-in Kotlin Coroutines/Flow(or ReactiveStreams API) for the async support.

@ianbotsf
Copy link
Contributor

It should be possible to adapt a ByteStream into a Flow. For example:

const val bufferSize = 4096

suspend fun main() {
    val s3 = S3Client.fromEnvironment { }

    val req = GetObjectRequest {
        bucket = "some-bucket"
        key = "some-key"
    }

    s3.getObject(req) { resp ->
        val stream = resp.body as ByteStream.OneShotStream
        val flow = produceFlow(stream.readFrom())
        consumeFlow(flow)
    }

    println("Complete!")
}

fun produceFlow(reader: SdkByteReadChannel): Flow<ByteArray> {
    val buffer = ByteArray(bufferSize)
    return flow {
        var bytes = reader.readAvailable(buffer)
        while (bytes != -1) {
            if (bytes > 0) {
                emit(buffer.copyOfRange(0, bytes))
            }
            bytes = reader.readAvailable(buffer)
        }
    }
}

suspend fun consumeFlow(chunks: Flow<ByteArray>) {
    var total = 0
    RandomAccessFile("/tmp/chunk-download.zip", "rw").use { file ->
        chunks.collect { chunk ->
            println("Received a ${chunk.size} byte chunk, writing to file (written $total bytes so far)")
            file.write(chunk)
            total += chunk.size
        }
        println("Finished writing file, wrote $total bytes")
    }
}

This code sample emits individually-allocated buffer chunks into a Flow in produceFlow. It then writes those chunks to a RandomAccessFile in consumeFlow. Note that the consuming code needs to execute in the context of the getObject response lambda (otherwise, the stream will be freed and the bytes will no longer be available).

@github-actions
Copy link

github-actions bot commented Sep 8, 2023

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

@hantsy
Copy link
Author

hantsy commented Nov 21, 2023

I tried to apply the Flow<ByteArray>.toByteStream, and ByteStream.toFlow(), failed, check the branch of the example project: hantsy/aws-sdk-kotlin-spring-example#6,

See related discussions: #1127

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request A feature should be added or improved.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants