Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java UDF OOM easily #11508

Closed
Tracked by #7405
xxchan opened this issue Aug 7, 2023 · 6 comments · Fixed by #13789
Closed
Tracked by #7405

Java UDF OOM easily #11508

xxchan opened this issue Aug 7, 2023 · 6 comments · Fixed by #13789
Assignees
Labels
type/bug Something isn't working
Milestone

Comments

@xxchan
Copy link
Member

xxchan commented Aug 7, 2023

Describe the bug

Tested with nexmark datagen. It OOM very soon.

Note: Python UDF doesn't have the problem, because it's bounded by GIL easily 😄

Error message/log

[flight-server-default-executor-20] ERROR com.risingwave.functions.UdfServer - Error occurred during UDF execution
org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
        at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
        at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
        at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
        at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
        at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354)
        at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349)
        at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337)
        at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315)
        at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
        at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279)
        at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
        at org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192)
        at org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338)
        at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308)
        at com.risingwave.functions.TypeUtils.fillVector(TypeUtils.java:220)
        at com.risingwave.functions.TypeUtils.createVector(TypeUtils.java:188)
        at com.risingwave.functions.ScalarFunctionBatch.evalBatch(ScalarFunctionBatch.java:57)
        at com.risingwave.functions.UdfProducer.doExchange(UdfServer.java:138)
        at org.apache.arrow.flight.FlightService.lambda$doExchangeCustom$2(FlightService.java:367)
        at io.grpc.Context$1.run(Context.java:566)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
        at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
        at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
        at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:701)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:676)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
        at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:180)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:137)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
        at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:181)
        at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
        at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
        ... 24 more

To Reproduce

create sources with Nexmark datagen connector

   CREATE SOURCE nexmark (
      event_type BIGINT,
      person STRUCT<"id" BIGINT,
                    "name" VARCHAR,
                    "email_address" VARCHAR,
                    "credit_card" VARCHAR,
                    "city" VARCHAR,
                    "state" VARCHAR,
                    "date_time" TIMESTAMP,
                    "extra" VARCHAR>,
      auction STRUCT<"id" BIGINT,
                    "item_name" VARCHAR,
                    "description" VARCHAR,
                    "initial_bid" BIGINT,
                    "reserve" BIGINT,
                    "date_time" TIMESTAMP,
                    "expires" TIMESTAMP,
                    "seller" BIGINT,
                    "category" BIGINT,
                    "extra" VARCHAR>,
      bid STRUCT<"auction" BIGINT,
                "bidder" BIGINT,
                "price" BIGINT,
                "channel" VARCHAR,
                "url" VARCHAR,
                "date_time" TIMESTAMP,
                "extra" VARCHAR>,
      p_time TIMESTAMPTZ as proctime()
    ) WITH (
        connector = 'nexmark',
        nexmark.split.num = '8',
        nexmark.min.event.gap.in.ns = '0'
    ); 
    CREATE VIEW bid 
    AS
    SELECT (bid).auction, (bid).bidder, (bid).price, (bid).channel, (bid).url, (bid).date_time, (bid).extra, p_time FROM nexmark WHERE event_type = 2;

    CREATE VIEW auction 
    AS
    SELECT (auction).id, (auction).item_name, (auction).description, (auction).initial_bid, (auction).reserve, (auction).date_time, (auction).expires, (auction).seller, (auction).category, (auction).extra, p_time FROM nexmark WHERE event_type = 1;
    
    CREATE VIEW person 
    AS
    SELECT (person).id, (person).name, (person).email_address, (person).credit_card, (person).city, (person).state, (person).date_time, (person).extra, p_time FROM nexmark WHERE event_type = 0;
 

CREATE FUNCTION java_count_char (s varchar, c varchar) RETURNS BIGINT AS count_char USING LINK 'http://localhost:8815';

CREATE SINK java_count_char AS
    SELECT java_count_char(extra, 'c') AS c_counts
    FROM bid
    WITH ( connector = 'blackhole', type = 'append-only');

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

No response

Additional context

No response

@xxchan xxchan added the type/bug Something isn't working label Aug 7, 2023
@github-actions github-actions bot added this to the release-1.2 milestone Aug 7, 2023
@xxchan
Copy link
Member Author

xxchan commented Aug 7, 2023

Also tested nexmark_q14, and it also OOM. Although much slower than direct function bench.

Does this indicate there's potential memory leak? Or is just memory management not good enough?


CREATE SINK nexmark_q14_java AS
    SELECT auction,
           bidder,
           0.908 * price as price,
           CASE
               WHEN
                           extract(hour from date_time) >= 8 AND
                           extract(hour from date_time) <= 18
                   THEN 'dayTime'
               WHEN
                           extract(hour from date_time) <= 6 OR
                           extract(hour from date_time) >= 20
                   THEN 'nightTime'
               ELSE 'otherTime'
               END       AS bidTimeType,
           date_time,
           extra,
           java_count_char(extra, 'c') AS c_counts
    FROM bid
    WHERE 0.908 * price > 1000000
      AND 0.908 * price < 50000000
    WITH ( connector = 'blackhole', type = 'append-only');

image

@xxchan
Copy link
Member Author

xxchan commented Aug 7, 2023

profiled with Intellij Profiler. result a little weird.

image

error is a little different (maybe because java 18 is used)

Exception in thread "flight-server-default-executor-273" java.lang.OutOfMemoryError: Cannot reserve 4194304 bytes of direct buffer memory (allocated: 8587886871, limit: 8589934592)
	at java.base/java.nio.Bits.reserveMemory(Bits.java:178)
	at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
	at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:332)
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:701)
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:676)
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
	at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:197)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:139)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:181)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
	at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349)
	at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315)
	at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279)
	at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
	at org.apache.arrow.flight.ArrowMessage.frame(ArrowMessage.java:318)
	at org.apache.arrow.flight.ArrowMessage.access$100(ArrowMessage.java:68)
	at org.apache.arrow.flight.ArrowMessage$ArrowMessageHolderMarshaller.parse(ArrowMessage.java:551)
	at org.apache.arrow.flight.ArrowMessage$ArrowMessageHolderMarshaller.parse(ArrowMessage.java:536)
	at io.grpc.MethodDescriptor.parseRequest(MethodDescriptor.java:307)
	at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:332)
	at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

@xxchan
Copy link
Member Author

xxchan commented Aug 7, 2023

It seems

@xxchan
Copy link
Member Author

xxchan commented Aug 7, 2023

We might need backpressure? 🤔

apache/arrow#13980 (comment)

the issue with gRPC Java is that it has a fixed, small buffer it uses for backpressure, so effectively every write will trigger backpressure and artificially throttle the producer, regardless of actual network conditions. This has been known for years but the upstream is not interested in fixing it: grpc/grpc-java#5433

Unfortunately without it, Java never applies any backpressure and instead you tend to just OOM. I believe C++ does the 'right' thing and automatically applies backpressure (by blocking the send call) above some threshold which I do not recall, but which I think is actually based on network conditions.

@fuyufjh
Copy link
Member

fuyufjh commented Sep 11, 2023

cc. @wangrunji0408

@wangrunji0408
Copy link
Contributor

I have reproduced and can observe the memory usage is constantly increasing. I tried the Memory Analyzer Tool (MAT) but it only found 10MB heap memory and failed to profile direct memory. I believe the memory leak is from arrow buffers. Let me read the docs again to understand more on Arrow memory management...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants