Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal.DeltaError: Generic DeltaTable error: Internal error: Invalid HashJoinExec partition count mismatch 1!=2 #2188

Closed
t1g0rz opened this issue Feb 16, 2024 · 13 comments
Labels
bug Something isn't working wontfix This will not be worked on

Comments

@t1g0rz
Copy link
Contributor

t1g0rz commented Feb 16, 2024

Environment

Delta-rs version:
rust-v0.17.0

Binding:
python
deltalake-0.15.3

Environment:

  • Cloud provider: AWS
  • OS: 22.04.1-Ubuntu
  • Other:
    • RAM: 949Mi
    • 1 CPU
    • t2.micro
    • ami-03f4878755434977f

Bug

What happened:

When I run a simple script a few times (twice, in my case) to simulate updating and partitioning, I encounter the following error.

Traceback (most recent call last):
  File "/home/ubuntu/test.py", line 52, in <module>
    er = dt.merge(source=tmp_df, predicate="target.idx = source.idx", source_alias='source', target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/deltalake/table.py", line 1713, in execute
    metrics = self.table._table.merge_execute(
_internal.DeltaError: Generic DeltaTable error: Internal error: Invalid HashJoinExec, partition count mismatch 1!=2,consider using RepartitionExec.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

What you expected to happen:
Table updated w/o any issues

How to reproduce it:
Here is the script. You can run it on t2.micto AWS

from deltalake import DeltaTable, write_deltalake
from sklearn.datasets import load_iris
import pyarrow as pa
import pandas as pd
from time import time, sleep
import boto3


session = boto3.session.Session()
credentials = session.get_credentials()

S3_BUCKET_RAW_DATA = <S3_BUCKET>
DELTA_TABLE_NAME = 'iris'

storage_options={
    'AWS_S3_LOCKING_PROVIDER': 'dynamodb',
    'DELTA_DYNAMO_TABLE_NAME': 'delta_log',
    'AWS_REGION': 'us-east-1',
    'DELTA_DYNAMO_REGION': 'us-east-1',
    'AWS_ACCESS_KEY_ID': credentials.access_key,
    'AWS_SECRET_ACCESS_KEY': credentials.secret_key,
}

if credentials.token:
    storage_options['AWS_SESSION_TOKEN'] = credentials.token

df = load_iris(as_frame=True)['data']
df.columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
df.reset_index(inplace=True, names=['idx'])

schema = pa.schema([
    pa.field("idx", pa.int64()),
    pa.field("sepal_length", pa.float64()),
    pa.field("sepal_width", pa.float64()),
    pa.field("petal_length", pa.float64()),
    pa.field("petal_width", pa.float64()),
    pa.field("updated_at", pa.float64())
])

DELTA_PATH = f"s3a://{S3_BUCKET_RAW_DATA}/bronze/{DELTA_TABLE_NAME}"

dt = DeltaTable.create(DELTA_PATH, schema=schema, mode="ignore",
                       partition_by=["updated_at"], storage_options=storage_options)

for i in range(50, 151, 50):
    tmp_df = df.iloc[:i].copy()
    tmp_df['updated_at'] = time()
    print(i)
    dt = DeltaTable(DELTA_PATH, storage_options=storage_options)
    er = dt.merge(source=tmp_df, predicate="target.idx = source.idx", source_alias='source', target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
    print(er)
    sleep(2)

More details:

How it looked:

ubuntu@ubuntu1:~$ python3 test.py 
50
{'num_source_rows': 50, 'num_target_rows_inserted': 50, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 50, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 168, 'scan_time_ms': 0, 'rewrite_time_ms': 106}
100
{'num_source_rows': 100, 'num_target_rows_inserted': 50, 'num_target_rows_updated': 50, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 100, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': 219, 'scan_time_ms': 0, 'rewrite_time_ms': 105}
150
{'num_source_rows': 150, 'num_target_rows_inserted': 50, 'num_target_rows_updated': 100, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 150, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': 173, 'scan_time_ms': 0, 'rewrite_time_ms': 65}

ubuntu@ubuntu1:~$ python3 test3.py 
50
{'num_source_rows': 50, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 50, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 100, 'num_output_rows': 150, 'num_target_files_added': 2, 'num_target_files_removed': 1, 'execution_time_ms': 196, 'scan_time_ms': 0, 'rewrite_time_ms': 104}
100
Traceback (most recent call last):
  File "/home/ubuntu/test3.py", line 52, in <module>
    er = dt.merge(source=tmp_df, predicate="target.idx = source.idx", source_alias='source', target_alias='target').when_matched_update_all().when_not_matched_insert_all().execute()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/deltalake/table.py", line 1713, in execute
    metrics = self.table._table.merge_execute(
_internal.DeltaError: Generic DeltaTable error: Internal error: Invalid HashJoinExec, partition count mismatch 1!=2,consider using RepartitionExec.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

But if I try to update the same delta table from my laptop there are no errors at all.

Memory consumption from ubuntu VM:

Top AVERAGE memory consumption, by line:
(1)    43:    10 MB                                                                                                                                          
Top PEAK memory consumption, by line:
(1)     2:    30 MB                                                                                                                                          
(2)     1:    20 MB                                                                                                                                          
(3)    43:    10 MB                                                                                                                                          
(4)     6:    10 MB                            
@t1g0rz t1g0rz added the bug Something isn't working label Feb 16, 2024
@rtyler rtyler added the binding/python Issues for the Python package label Feb 16, 2024
@echai58
Copy link

echai58 commented Feb 21, 2024

You may need more CPUs, I saw this recently and upping the number of CPUs resolved this.

@t1g0rz
Copy link
Contributor Author

t1g0rz commented Feb 22, 2024

@echai58 Thank you! Perhaps you know the way to evaluate how many CPUs may potentially be needed?

@germanmunuera
Copy link

@t1g0rz Were you able to solve the problem by increasing the CPU? I have the same issue inside a Pod in Kubernetes

@ion-elgreco
Copy link
Collaborator

@germanmunuera how much vcpus are you assigning to the pod?

@germanmunuera
Copy link

    resources:
        requests:
          cpu: "250m"
          memory: "1Gi"
        limits:
          cpu: "750m"
          memory: "4Gi"

@t1g0rz
Copy link
Contributor Author

t1g0rz commented Mar 20, 2024

@germanmunuera Yep, everything is fine now. I added up to 4 CPUs and this resolved the issue.

@rob-harrison
Copy link

python 3.11
deltalake 0.16.0

Had the same issue here with:
requests cpu 100m/limits cpu 1.

Generic DeltaTable error: Internal error: Invalid HashJoinExec, partition count mismatch 1!=3,consider using RepartitionExec. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

Resolved by increasing to:
requests cpu 2/limits cpu 2

The question is why does this resolve it?

@germanmunuera
Copy link

Solved with 2 CPUs!
Thanks @rob-harrison and @t1g0rz

@ion-elgreco ion-elgreco added wontfix This will not be worked on and removed binding/python Issues for the Python package labels Mar 29, 2024
@echai58
Copy link

echai58 commented Apr 3, 2024

Wanted to follow up on this, @ion-elgreco @Blajda are either of you aware of why datafusion requires > 1 cpu for merging?

@ion-elgreco
Copy link
Collaborator

Nope, but you can try asking the question in datafusion repo :)

@echai58
Copy link

echai58 commented Apr 16, 2024

apache/datafusion#10095 - a datafusion pr was opened recently that should resolve this issue!

@arohland
Copy link

Any idea when the datafusion fix will make it into a realease?

@ion-elgreco
Copy link
Collaborator

Any idea when the datafusion fix will make it into a realease?

It's included in Python v0.18.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

7 participants