Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix generation temp filename in DataprocSubmitPySparkJobOperator #39498

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):

@staticmethod
def _generate_temp_filename(filename):
return f"{time:%Y%m%d%H%M%S}_{uuid.uuid4()!s:.8}_{ntpath.basename(filename)}"
return f"{time.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4()!s:.8}_{ntpath.basename(filename)}"

def _upload_file_temp(self, bucket, local_file):
"""Upload a local file to a Google Cloud Storage bucket."""
Expand Down
20 changes: 20 additions & 0 deletions tests/providers/google/cloud/operators/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import inspect
from unittest import mock
from unittest.mock import MagicMock, Mock, call
from uuid import UUID

import pytest
from google.api_core.exceptions import AlreadyExists, NotFound
Expand Down Expand Up @@ -2567,6 +2568,25 @@ def test_execute(self, mock_hook, mock_uuid):
job = op.generate_job()
assert self.job == job

@pytest.mark.parametrize(
"filename",
[
pytest.param("/foo/bar/baz.py", id="absolute"),
pytest.param("foo/bar/baz.py", id="relative"),
pytest.param("baz.py", id="base-filename"),
pytest.param(r"C:\foo\bar\baz.py", id="windows-path"),
],
)
def test_generate_temp_filename(self, filename, time_machine):
time_machine.move_to(datetime(2024, 2, 29, 1, 2, 3), tick=False)
with mock.patch(
DATAPROC_PATH.format("uuid.uuid4"), return_value=UUID("12345678-0000-4000-0000-000000000000")
):
assert (
DataprocSubmitPySparkJobOperator._generate_temp_filename(filename)
== "20240229010203_12345678_baz.py"
)


class TestDataprocCreateWorkflowTemplateOperator:
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
Expand Down