Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running the same batch task multiple times using the Flink Session pattern causes the linux out of memory #4305

Closed
renshangtao opened this issue Mar 10, 2022 · 6 comments
Labels

Comments

@renshangtao
Copy link
Contributor

I have A batch task that inserts 100 million data from table A into table B after the primary key hash. I use Flink mode to execute this task. After several times of executing this task, I find that the Flink TaskManager process occupies more and more memory. Finally, the Flink TaskManager process is killed when the system runs out of memory.

Manually performing GC memory reclamation after the task execution is completed has no effect, and viewing memory through top does not decrease. unless the Flink TaskManager is turned off.

Logically, when a task is finished, it should release the resources it occupies, so that subsequent tasks can continue to run, but it does not.

The growth of memory occurs in the icebergStreamWriter stage, but I look at the code and do not find a place to use off-heap memory, so it is more confusing, can anyone help me?

image

Memory usage is 25% before running the job, and 72% memory is reached after three times
image

The Configuration
006

The System kill taskmanager process
003

@renshangtao
Copy link
Contributor Author

Install jemalloc on the every node of flink cluster can solve this problem

1、install jemalloc

2、Add the configuration below in /etc/profile
export LD_PRELOAD=/usr/local/lib/libjemalloc.so
#export MALLOC_CONF="prof:true,prof_prefix:/home/bigdata/jeprof.out,lg_prof_interval:30,lg_prof_sample:20"

3、The source /etc/profile

4、And Then start flink cluster

5、execute the batch jobs

I install jemalloc on two of five nodes, then here are the result of test, after seven times exec batch job the memory is increse to 50%+ who use glibc-malloc

1647448675433-20ce92b8-894a-4649-bf7f-0039072831cf

Why change the glibc-malloc to jemalloc

The reason i think maybe the glibc-malloc casued plenty memory fragmentation。

@rdblue @openinx

can you help me,i use flink to writer hive table don't have this problem

@openinx
Copy link
Member

openinx commented Mar 17, 2022

I think the core reason is: we have set the primary keys in iceberg table for flink DDL, so the flink write job will think that we are trying to write all those results into an iceberg format v2 table. see the following code piece:

https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java#L62-L72

Do you want to write all those records into an iceberg format v2 table to deduplicate those rows who has the same primary key ? If not , then I will suggest to remove the primary key definition in the flink sink table DDL. If sure, then I think we may need to port this PR in your branch to fix the OutOfMemory issue.

@renshangtao
Copy link
Contributor Author

renshangtao commented Mar 18, 2022

@openinx
First thanks a lot for your reply, It was I who didn't describe the scene clearly

1、This is my source table:
create table hive_catalog.iceberg_db.sourceTable (
step_id string,
param_id string,
wafer_id string,
chip_id string,
product_id1 string,
product_id2 string,
product_id3 string,
product_id4 string,
product_id5 string,
num_item1 double,
num_item2 double,
num_item3 double,
num_item4 double,
num_item5 double,
num_item6 double,
start_date date,
PRIMARY KEY (chip_id, param_id, step_id, wafer_id) NOT ENFORCED)
PARTITIONED BY (step_id, start_date)
WITH ('format-version'='2');

2、This is Temp Table:
create table hive_catalog.iceberg_db.temp_data (
step_id string,
param_id string,
wafer_id string,
chip_id string,
product_id1 string,
product_id2 string,
product_id3 string,
product_id4 string,
product_id5 string,
num_item1 double,
num_item2 double,
num_item3 double,
num_item4 double,
num_item5 double,
num_item6 double,
start_date date,
part_id int)
PARTITIONED BY (part_id)
WITH ('format-version'='2', 'write.distribution-mode'='hash');

This is the sql which my batch job use.

3、insert into temp_data select
step_id,
param_id,
wafer_id,
chip_id,
product_id1,
product_id2,
product_id3,
product_id4,
product_id5,
num_item1,
num_item2,
num_item3,
num_item4,
num_item5,
num_item6,
start_date,
(HASH_CODE(step_id || param_id || step_id || wafer_id)) % 10 as part_id
from source;

After i run this job seven or eight tims the memory of linux will be incresed to 80%,And the memory which taskmanger used is 60%.

But if i use jemalloc to run several times of job, the memory is noraml , which is same as the flink cluster just started

@renshangtao
Copy link
Contributor Author

i find an article an solve this problem

prestodb/presto#8993

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Sep 16, 2022
@github-actions
Copy link

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants