Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocksize arg for ftp hook #24860

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions airflow/providers/ftp/hooks/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import datetime
import ftplib
import os.path
from typing import Any, List, Optional, Tuple
from typing import Any, Callable, List, Optional, Tuple

from airflow.hooks.base import BaseHook

Expand Down Expand Up @@ -115,7 +115,13 @@ def delete_directory(self, path: str) -> None:
conn = self.get_conn()
conn.rmd(path)

def retrieve_file(self, remote_full_path, local_full_path_or_buffer, callback=None):
def retrieve_file(
self,
remote_full_path: str,
local_full_path_or_buffer: Any,
callback: Optional[Callable] = None,
block_size: int = 8192,
) -> None:
"""
Transfers the remote file to a local location.

Expand All @@ -132,6 +138,8 @@ def retrieve_file(self, remote_full_path, local_full_path_or_buffer, callback=No
that writing to a file or buffer will need to be handled inside the
callback.
[default: output_handle.write()]
:param block_size: file is transferred in chunks of default size 8192
or as set by user

.. code-block:: python

Expand Down Expand Up @@ -164,31 +172,30 @@ def write_to_file_with_progress(data):

"""
conn = self.get_conn()

is_path = isinstance(local_full_path_or_buffer, str)

# without a callback, default to writing to a user-provided file or
# file-like buffer
if not callback:
if is_path:

output_handle = open(local_full_path_or_buffer, 'wb')
else:
output_handle = local_full_path_or_buffer

callback = output_handle.write
else:
output_handle = None

remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
self.log.info('Retrieving file from FTP: %s', remote_full_path)
conn.retrbinary(f'RETR {remote_file_name}', callback)
conn.retrbinary(f'RETR {remote_file_name}', callback, block_size)
self.log.info('Finished retrieving file from FTP: %s', remote_full_path)

if is_path and output_handle:
output_handle.close()

def store_file(self, remote_full_path: str, local_full_path_or_buffer: Any) -> None:
def store_file(
self, remote_full_path: str, local_full_path_or_buffer: Any, block_size: int = 8192
) -> None:
"""
Transfers a local file to the remote location.

Expand All @@ -199,19 +206,19 @@ def store_file(self, remote_full_path: str, local_full_path_or_buffer: Any) -> N
:param remote_full_path: full path to the remote file
:param local_full_path_or_buffer: full path to the local file or a
file-like buffer
:param block_size: file is transferred in chunks of default size 8192
or as set by user
"""
conn = self.get_conn()

is_path = isinstance(local_full_path_or_buffer, str)

if is_path:

input_handle = open(local_full_path_or_buffer, 'rb')
else:
input_handle = local_full_path_or_buffer
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
conn.storbinary(f'STOR {remote_file_name}', input_handle)
conn.storbinary(f'STOR {remote_file_name}', input_handle, block_size)

if is_path:
input_handle.close()
Expand Down
4 changes: 2 additions & 2 deletions tests/providers/ftp/hooks/test_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ def test_retrieve_file(self):
_buffer = io.StringIO('buffer')
with fh.FTPHook() as ftp_hook:
ftp_hook.retrieve_file(self.path, _buffer)
self.conn_mock.retrbinary.assert_called_once_with('RETR path', _buffer.write)
self.conn_mock.retrbinary.assert_called_once_with('RETR path', _buffer.write, 8192)

def test_retrieve_file_with_callback(self):
func = mock.Mock()
_buffer = io.StringIO('buffer')
with fh.FTPHook() as ftp_hook:
ftp_hook.retrieve_file(self.path, _buffer, callback=func)
self.conn_mock.retrbinary.assert_called_once_with('RETR path', func)
self.conn_mock.retrbinary.assert_called_once_with('RETR path', func, 8192)

def test_connection_success(self):
with fh.FTPHook() as ftp_hook:
Expand Down