Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[master] S3 Pillar Pagination Fixes #55694

Merged
merged 5 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions salt/pillar/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,13 @@ def _refresh_buckets_cache_file(creds, cache_file, multiple_env, environment, pr
'''

# helper s3 query function
def __get_s3_meta():
def __get_s3_meta(continuation_token=None):
# We want to use ListObjectsV2 so we get the NextContinuationToken
params = {'prefix': prefix, 'list-type': 2}

if continuation_token:
params['continuation-token'] = continuation_token

return __utils__['s3.query'](
key=creds.key,
keyid=creds.keyid,
Expand All @@ -283,7 +289,7 @@ def __get_s3_meta():
verify_ssl=creds.verify_ssl,
location=creds.location,
return_bin=False,
params={'prefix': prefix},
params=params,
path_style=creds.path_style,
https_enable=creds.https_enable)

Expand All @@ -296,6 +302,12 @@ def __get_pillar_environments(files):
environments = [(os.path.dirname(k['Key']).split('/', 1))[0] for k in files]
return set(environments)

def __get_continuation_token(s3_meta):
return next((item.get('NextContinuationToken')
for item in s3_meta
if item.get('NextContinuationToken')),
None)

log.debug('Refreshing S3 buckets pillar cache file')

metadata = {}
Expand All @@ -312,6 +324,14 @@ def __get_pillar_environments(files):
if s3_meta:
bucket_files[bucket] = __get_pillar_files_from_s3_meta(s3_meta)

# Check if we have a NextContinuationToken and loop until we don't
while True:
continuation_token = __get_continuation_token(s3_meta)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be rewritten as an iterator?

for continuation_token in __get_continuation_tokens(s3_meta):
    ...

I think the function would just be:

for item in s3_meta:
    if item.get('NextContinuationToken'):
        yield item('NextContinuationToken')

Or something to that effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that approach will work. There is only ever one NextContinuationToken in s3_meta, but it could be different when s3_meta is updated after subsequent calls to __get_s3_meta. Once it's not longer in s3_meta then it should exist the loop since there are no more results to pull down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooooh. I see!

What about something like this?


    # helper s3 query function
    def __get_s3_meta():
        params = {'prefix': prefix, 'list-type': 2}
        while True:
            result = __utils__['s3.query'](
                key=creds.key,
                keyid=creds.keyid,
                kms_keyid=creds.kms_keyid,
                bucket=creds.bucket,
                service_url=creds.service_url,
                verify_ssl=creds.verify_ssl,
                location=creds.location,
                return_bin=False,
                params={'prefix': prefix},
                path_style=creds.path_style,
                https_enable=creds.https_enable)
            yield result
            continuation_token = next((item.get('NextContinuationToken')
                                       for item in s3_meta
                                       if item.get('NextContinuationToken')),
                                      None)
            if continuation_token is None:
                break
            else:
                params['continuation-token'] = continuation_token

Then it could simply be

for s3_meta in __get_s3_meta():
    ...

Would that work?

if not continuation_token:
break
s3_meta = __get_s3_meta(continuation_token)
bucket_files[bucket] += __get_pillar_files_from_s3_meta(s3_meta)

metadata[environment] = bucket_files

else:
Expand All @@ -322,6 +342,15 @@ def __get_pillar_environments(files):
# s3 query returned data
if s3_meta:
files = __get_pillar_files_from_s3_meta(s3_meta)

# Check if we have a NextContinuationToken and loop until we don't
while True:
continuation_token = __get_continuation_token(s3_meta)
if not continuation_token:
break
s3_meta = __get_s3_meta(continuation_token)
files += __get_pillar_files_from_s3_meta(s3_meta)

environments = __get_pillar_environments(files)

# pull out the files for the environment
Expand All @@ -343,7 +372,7 @@ def __get_pillar_environments(files):

log.debug('Writing S3 buckets pillar cache file')

with salt.utils.files.fopen(cache_file, 'w') as fp_:
with salt.utils.files.fopen(cache_file, 'wb') as fp_:
pickle.dump(metadata, fp_)

return metadata
Expand Down
93 changes: 93 additions & 0 deletions tests/unit/pillar/test_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-

# Import python libs
from __future__ import absolute_import, print_function, unicode_literals

import logging

# Import Salt Testing libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase
from tests.support.mock import patch, mock_open, MagicMock

# Import Salt Libs
import salt.pillar.s3 as s3_pillar
from salt.ext.six.moves import range

log = logging.getLogger(__name__)


class S3PillarTestCase(TestCase, LoaderModuleMockMixin):
'''
TestCase for salt.pillar.s3
'''
def setup_loader_modules(self):
s3_pillar_globals = {
'__utils__': {}
}
return {s3_pillar: s3_pillar_globals}

def test_refresh_buckets_cache_file(self):
'''
Test pagination with refresh_buckets_cache_file
'''
key = 'XXXXXXXXXXXXXXXXXXXXX'
keyid = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
bucket = 'dummy_bucket'
service_url = 's3.amazonaws.com'
cache_file = 'dummy_file'

s3_creds = s3_pillar.S3Credentials(key, keyid, bucket, service_url)

mock_return_first = [{'Name': 'pillar-bucket'},
{'Prefix': 'test'},
{'KeyCount': '10'},
{'MaxKeys': '10'},
{'NextContinuationToken': 'XXXXX'},
{'IsTruncated': 'true'}]

mock_return_second = [{'Name': 'pillar-bucket'},
{'Prefix': 'test'},
{'KeyCount': '10'},
{'MaxKeys': '10'},
{'IsTruncated': 'true'}]

first_range_end = 999
second_range_end = 1200
for i in range(0, first_range_end):
key_name = '{0}/init.sls'.format(i)
tmp = {'Key': key_name,
'LastModified': '2019-12-18T15:54:39.000Z',
'ETag': '"fba0a053704e8b357c94be90b44bb640"',
'Size': '5 ',
'StorageClass': 'STANDARD'}
mock_return_first.append(tmp)

for i in range(first_range_end, second_range_end):
key_name = '{0}/init.sls'.format(i)
tmp = {'Key': key_name,
'LastModified': '2019-12-18T15:54:39.000Z',
'ETag': '"fba0a053704e8b357c94be90b44bb640"',
'Size': '5 ',
'StorageClass': 'STANDARD'}
mock_return_second.append(tmp)

_expected = {'base': {'dummy_bucket': []}}
for i in range(0, second_range_end):
key_name = '{0}/init.sls'.format(i)
tmp = {'Key': key_name,
'LastModified': '2019-12-18T15:54:39.000Z',
'ETag': '"fba0a053704e8b357c94be90b44bb640"',
'Size': '5 ',
'StorageClass': 'STANDARD'}
_expected['base']['dummy_bucket'].append(tmp)

mock_s3_query = MagicMock(side_effect=[mock_return_first, mock_return_second])
with patch.dict(s3_pillar.__utils__, {'s3.query': mock_s3_query}):
with patch('salt.utils.files.fopen', mock_open(read_data=b'')):
ret = s3_pillar._refresh_buckets_cache_file(s3_creds,
cache_file,
False,
'base',
'')
self.assertEqual(ret, _expected)