Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading errors after upgrade from 0.2.0 to 0.4.0 #271

Open
ohadmata opened this issue Dec 2, 2019 · 20 comments
Open

Reading errors after upgrade from 0.2.0 to 0.4.0 #271

ohadmata opened this issue Dec 2, 2019 · 20 comments

Comments

@ohadmata
Copy link

ohadmata commented Dec 2, 2019

Hi
We are running Flask service on a docker image.
one of our main routes reads file from S3 and returns it's content.
recently we upgraded the package from version 0.2.0 to 0.4.0.

After the upgrade, the route is working fine, but after several hours it raises strange exceptions contains only the file path (without any other information)

Our temporary solution for that is to reset the container, after that it's working fine, but fails again after several hours.

The most strange thing is that when I am connecting to the same container (with the connection issues) and I am opening python shell and execute the code - It's working as expected, even when the flask service on the same container raise exceptions on the same file.

any ideas?
thanks

my code:

fs = s3fs.S3FileSystem()
with fs.open('PATH_TO_FILE.json', 'rb') as f:
    data = f.read()
    f.close()
    json_content =  json.loads(data.decode('utf-8'))

print(json_content)
@jacobtomlinson
Copy link
Contributor

Thanks for raising this.

It would be helpful if you could also share the errors you are seeing, especially as this takes several hours to reproduce.

@ohadmata
Copy link
Author

ohadmata commented Dec 2, 2019

Hi.
As I said, the error is just the path to the file in the S3 bucket. nothing else :-(

  • I am try new solution for this use case -
    I am downloading the file into /tmp directory using fs.get (instead of fs.open) and read the file from the local filesystem. I am hoping this will solve this issue for now.

@jacobtomlinson
Copy link
Contributor

jacobtomlinson commented Dec 2, 2019

Ah sorry I misunderstood. Strange that you are seeing only the path with no Python traceback.

You could try wrapping your whole with block in a very broad try-except statement and then print out any information you are able to gather in there. It may be that your exception is being supressed somehow.

@ohadmata
Copy link
Author

ohadmata commented Dec 2, 2019

That exactly what I did.
There is a try..catch block but the exception itself is just the s3 path :-(
I am attaching the whole code, hoping it will be relevant, it's include storage interface

@bp.route('/get_data', methods=('GET', 'POST'))
def get_data():
    storage = Storage(StorageTypes.S3, config['s3_bucket'])
    try:
        json_content = storage.read(f'PATH_TO_FILE.json', DataFormats.DICTIONARY)
        result = json_content
    except Exception as e:
        print(e)
        result['exception'] = str(e)

        # print boto3 debug data
        print('boto3 debug data:')
        import boto3
        client = boto3.client('sts')
        print(client.get_caller_identity()['Arn'])
    finally:
        return jsonify(result)


class StorageTypes(Enum):
    S3 = 1
    NFS = 2


class Storage:
    def __init__(self, storage_type, root_path, params=None):

        self.root_path = root_path

        # set the storage type (Default to S3)
        if storage_type is None:
            self.storage_type = StorageTypes.S3
        else:
            self.storage_type = storage_type

        # execute the constructors
        if self.storage_type == StorageTypes.S3:
            self.storage_object = S3(self.root_path, params)

    def read(self, path, data_format):
        return self.storage_object.read(path, data_format)


class S3:
    def __init__(self, root_path, params=None):
        key = None
        secret = None

        # The root_path is the bucket name
        self.bucket_name = root_path

        # check for given params
        if params is not None:
            if 'key' in params and 'secret' in params:
                key = params['key']
                secret = params['secret']

        # create the s3fs object
        if key is not None and secret is not None:

            self.key = key
            self.secret = secret

            fs = s3fs.S3FileSystem(
                key=key,
                secret=secret
            )
        else:
            fs = s3fs.S3FileSystem()

        self.fs = fs

    def extract_path(self, path):
        if path.lower().startswith('s3://'):
            return path
        else:
            return f's3://{self.bucket_name}/{path}'

    def read(self, path, data_format=None):
        path = self.extract_path(path)

        if data_format == DataFormats.DICTIONARY:
            with self.fs.open(path, 'rb') as f:
                data = f.read()
                f.close()
            return json.loads(data.decode('utf-8'))

        if data_format == DataFormats.TEXT:
            with self.fs.open(path, 'rb') as f:
                data = f.read()
                f.close()
            return data.decode('utf-8')

        if data_format == DataFormats.PANDAS_TABLE:
            import pyarrow.parquet as pq
            return pq.ParquetDataset(path, filesystem=self.fs).read_pandas()

        # default - read binary file
        with self.fs.open(path, 'rb') as f:
            data = f.read()
            f.close()
        return data

@jacobtomlinson
Copy link
Contributor

The exception message may just be that path but does the traceback show where it is being raised?

import traceback

...

except Exception as e:
    traceback.print_tb(e.__traceback__)

@ohadmata
Copy link
Author

ohadmata commented Dec 3, 2019

Hi
I added the traceback and I am getting FileNotFound Exception. As I said, the file exists :-(
I'm not totally sure that it has do with the file, since we have different files that really do not exist in S3.

  File "/usr/local/lib/python3.7/site-packages/noos_utils/storage/__init__.py", line 32, in read
    return self.storage_object.read(path, data_format)
  File "/usr/local/lib/python3.7/site-packages/noos_utils/storage/S3.py", line 62, in read
    self.fs.get(path, tmp)
  File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 568, in get
    with self.open(rpath, "rb", **kwargs) as f1:
  File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 724, in open
    **kwargs
  File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 315, in _open
    autocommit=autocommit, requester_pays=requester_pays)
  File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 957, in __init__
    cache_type=cache_type)
  File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 956, in __init__
    self.details = fs.info(path)
  File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 486, in info
    return super().info(path)
  File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 511, in info
    raise FileNotFoundError(path)


@jacobtomlinson
Copy link
Contributor

Are you modifying the file on S3? Or is it just a static file?

Could there be a chance that the file potentially doesn't exist for a brief amount of time?

@ohadmata
Copy link
Author

ohadmata commented Dec 3, 2019

Nope, I am creating this file and read it after few minutes.
After the creation, the file wont change

@jacobtomlinson
Copy link
Contributor

Hmm ok. It appears that S3 is reporting the file doesn't exist which is raising the error.

It's not exactly a solution but you could catch that error and retry?

@martindurant
Copy link
Member

Duplicate of #253 ? @TomAugspurger , time to make dircache optional and off by default; or to introduce timed expiry?

@ohadmata
Copy link
Author

ohadmata commented Dec 3, 2019

I am trying to execute this line of code before any S3 read, Hopefully it will help.
My next step will to go back to boto3 commands ... :-(

self.fs.invalidate_cache()

@TomAugspurger
Copy link
Contributor

time to make dircache optional and off by default

Yes to making it optional. I'm not sure about changing the default. It's hard to say what the right one is since it's workload dependent. dircache=False would be the safer / more correct option though.

@ohadmata
Copy link
Author

ohadmata commented Dec 5, 2019

Hi everyone.
After two days It looks like this change [invalidate_cache] fixed the problem.
thanks!

@jacobtomlinson
Copy link
Contributor

Thanks for coming back to let us know!

@rpanai
Copy link

rpanai commented Feb 4, 2020

I have a similar problem. In my case I'm doing a query to Athena. With s3fs = 0.2.0 I can do as many query as I want but with newer version I'm only getting a result for the first query and then I got FileNotFoundError for subsequent ones..
It's not quite clear to me how should I use s3fs.S3FileSystem.invalidate_cache().

def wait_for_query(athena_client, execution_id):
    # TODO: add timeout
    while True:
        response = athena_client.get_query_execution(
            QueryExecutionId=execution_id)
        status = response['QueryExecution']['Status']['State']
        if status in ['FAILED', 'CANCELLED']:
            error = \
                response['QueryExecution']['Status']['StateChangeReason']
            raise Exception('Error doing query\n{}'.format(error))
        elif status == 'SUCCEEDED':
            break
        sleep(2)
    return response

def execute_athena_query(query, wait=True):
    athena_client = boto3.client('athena', 'us-east-1')
    athena_config = {
        'OutputLocation': 's3://my_bucket/athena_results',
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
    }
    athena_context = {'Database': 'my_database'}
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext=athena_context,
        ResultConfiguration=athena_config)
    execution_id = response['QueryExecutionId']
    if wait:
        response = wait_for_query(athena_client, execution_id)
    return response

def get_df_athena(query, dtypes):
    response = execute_athena_query(query)
    location = response['QueryExecution']['ResultConfiguration']\
        ['OutputLocation']
    df = pandas.read_csv(location, dtype=dtypes)
    return df

@martindurant
Copy link
Member

@TomAugspurger , revive the no-caching option? Or to try a direct HEAD in the case that a file appears unfound?

@TomAugspurger
Copy link
Contributor

I'm not sure, though it seems to be causing plenty of issues.

To start with, I'd like to see a keyword cache_instances added to S3FileSystem.__init__ to control this.

@rpanai
Copy link

rpanai commented Feb 5, 2020

I'm not sure, though it seems to be causing plenty of issues.

To start with, I'd like to see a keyword cache_instances added to S3FileSystem.__init__ to control this.

Do you want me to help with this?

@TomAugspurger
Copy link
Contributor

If you're able to, that'd be great!

@martindurant
Copy link
Member

Note that you can already set S3FileSystem.cachable = False as a workaround.
@TomAugspurger , I actually meant the file cache, not the instance cache - you had a PR for this in fsspec?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants