Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Critical] All Channel implementations leak every passed object #326

Closed
amal opened this issue Apr 12, 2018 · 6 comments
Closed

[Critical] All Channel implementations leak every passed object #326

amal opened this issue Apr 12, 2018 · 6 comments
Labels

Comments

@amal
Copy link

amal commented Apr 12, 2018

Last two days I tried to avoid huge memory leak in kotlinx.coroutines.experimental.channels.Channel.
All I found is that OutOfMemoryError is unavoidable at this moment (correct me if I'm wrong, please).
Every Channel implementations leak every object passed through it. This makes them impossible for real use.

Example to quickly reproduce memory leak (run it with -Xmx256m, for example):

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.consume
import java.lang.management.ManagementFactory
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger

fun main(args: Array<String>) {
	@Suppress("JoinDeclarationAndAssignment")
	val channel: Channel<ArrayList<Double>>
	channel = Channel(Channel.UNLIMITED)
//	channel = Channel(0)
//	channel = Channel(16)
	launch {
		val I1 = 50_000_000
		val I2 = 5_000
		val countDownLatch = CountDownLatch(I1)
		for (i in 0 until I1) {
			async(DefaultDispatcher) {
				val numbers = ArrayList<Double>()
				for (k in 1..I2)
					numbers.add(k * 1.0)
				channel.send(numbers)
				countDownLatch.countDown()
			}
		}
		countDownLatch.await()
		channel.close()
	}
	runBlocking {
		val counter = AtomicInteger()
		channel.consume {
			for (element in this) {
				val mxBean = ManagementFactory.getMemoryMXBean()
				val usedMemory = mxBean.heapMemoryUsage.used / 1024f / 1024f
				println("${counter.getAndIncrement()}: $usedMemory MB used")
			}
		}
	}
}
@amal amal changed the title [Critical] All Channel implementation leaks every passed object [Critical] All Channel implementations leak every passed object Apr 12, 2018
@fvasco
Copy link
Contributor

fvasco commented Apr 12, 2018

Every Channel implementations leak every object passed through it

What type of objects are you seen in memory?

for (i in 0 until 50_000_000) {
    async(DefaultDispatcher) {

[Critical] There is a valid use case for creating a 50_000_000 coroutines?

countDownLatch.await()

[Critical] This is a blocking call on non-blocking dispatcher

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Apr 12, 2018

Channels don't leak all passed objects.

In your example, you create 50_000_000 asynchronous jobs (and this alone is a huge memory pressure), each of them creates a list of 5_000 Double objects and sends it to channel in N threads (using parallelism from DefaultDispatcher).

Then you consume from this channel using one consumer. Moreover, this consumer invokes relatively slow operations (writing to stdout). As a consequence, your consumer just can't keep up with producers and all sent elements are buffered in the unlimited channel.

Solutions:

  1. Try to use a channel with limited capacity
  2. Avoid creating 50_000_000 jobs at once without backpressure mechanism.

In general, it's not a channel issue, if you replace channel with, for example, LinkedBlockingQueue, you still will have OutOfMemoryError, but it doesn't mean that queue "leak" all passed elements.

@amal
Copy link
Author

amal commented Apr 12, 2018

Channels do leak all passed objects in memory in some way.
Any implementation. RendezvousChannel (without any internal buffer), LinkedListChannel, ArrayChannel.
You will have OOM with any of them. Just pass enough number of objects to fill all memory.
I can see ALL passed objects in the dumped heap at any time. Long after consuming each of them. And all of them have references from some internal kotlinx.coroutines objects.

You can add Thread.sleep(1000) in producer and you'll have OOM anyway. It just will be later.
You can also remove writing to stdout and you s

No, there is no problem with LinkedBlockingQueue or raw RxJava.

My real use case has a much slower data producer. I'm processing about 56_000_000 of double collections -- it's all slow calculation results. There is no problem with speed of consuming.

I'm waiting for CountDownLatch to close channel from the writing side. It's not a required component for error reproducing.

You can pass little objects instead of big collections and still have a memory leak. It's just slower to get OOM.

Try this (you'll get OOM exception):

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*

fun main(args: Array<String>) {
	val channel: Channel<ArrayList<Double>> = RendezvousChannel()
	launch {
		val I1 = 50_000_000
		val I2 = 5_000
		for (i in 0 until I1) {
			async(DefaultDispatcher) {
				Thread.sleep(50)
				val numbers = ArrayList<Double>()
				for (k in 1..I2)
					numbers.add(k * 1.0)
				channel.send(numbers)
			}
		}
	}
	runBlocking {
		//val counter = AtomicInteger()
		channel.consume {
			for (element in this) {
				if (element.isEmpty())
					throw IllegalStateException()
			}
		}
	}
}

@fvasco
Copy link
Contributor

fvasco commented Apr 12, 2018

No, there is no problem with LinkedBlockingQueue or raw RxJava.

Can you share the alternative implementations?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Apr 12, 2018

Please provide rx sample, you're probably using operators with backpressure mechanism.

I've checked, OOM with unlimited LinkedBlockingQueue is reproducible as well. Thread.sleep on producing side won't help because you have one thread which creates 50_000_000 of objects (coroutines) without any throttling, which then are calling sleep after their creation. 50_000_000 of coroutines are enough to exhaust a 256m JVM.

If you'll use Thread.sleep before creating a coroutine (right before async(DefaultDispatcher)) everything will work (though, pretty slowly).

What you need is backpressure mechanism.
You can use limited size channel for this purpose as well, where capacity will be the amount of pending (not currently executing) tasks.

E.g. you you'll rewrite your sample this way:

// Maximum 32 unprocessed results
val listChannel: Channel<ArrayList<Double>> = Channel(32)
// Maximum 32 created `async` tasks
val semaphoreChannel = Channel<Unit>(32)

launch {
    val I1 = 50_000_000
    val I2 = 5_000
    val countDownLatch = CountDownLatch(I1)
    for (i in 0 until I1) {
        // Acquire permit to launch new task
        semaphoreChannel.send(Unit)
        async(DefaultDispatcher) {
            val numbers = ArrayList<Double>()
            for (k in 1..I2)
                numbers.add(k * 1.0)
            listChannel.send(numbers)
            countDownLatch.countDown()
            semaphoreChannel.receive() // release permit
        }
    }
    countDownLatch.await()
    listChannel.close()
}
runBlocking {
    val counter = AtomicInteger()
    listChannel.consume {
        for (element in this) {
            val mxBean = ManagementFactory.getMemoryMXBean()
            val usedMemory = mxBean.heapMemoryUsage.used / 1024f / 1024f
            println("${counter.getAndIncrement()}: $usedMemory MB used")
        }
    }
}

On my machine, it never exceeds 150 mb to process ~2kk of tasks.

But in general this approach is not intuitive, what will be really helpful is working pool pattern #172, so your example could be hypothetically rewritten as something like

produce<Int> { 
  for (i in 0 until I1) {
    send(i) 
  }
}.map(parallelism = 8) {
    val numbers = ArrayList<Double>()
     for (k in 1..I2)
        numbers.add(k * 1.0)
     send(numbers)
}.map(parallelism = 1) {
    val mxBean = ManagementFactory.getMemoryMXBean()
    val usedMemory = mxBean.heapMemoryUsage.used / 1024f / 1024f
    println("${counter.getAndIncrement()}: $usedMemory MB used")
}

@amal
Copy link
Author

amal commented Apr 12, 2018

@qwwdfsad yeah seems that better backpressure control can reduce memory usage tremendously.

And also I left too many simultaneous tasks for this reproduce example and OOM was definitely from the number of created coroutines, not getting to OOM from the sent data. My bad.

Sorry for bothering with a false alarm.
What I didn't understand is that even with limited RendezvousChannel and ArrayChannel sender suspends when the channel is full, not blocked.
Seems what I saw in the dumped heap were this suspended items. Of course, these objects have references to the sent data :(

With the working pool pattern, it would be much more usable and intuitive.

@amal amal closed this as completed Apr 12, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants