diff --git a/.github/build_operators_commits.txt b/.github/build_operators_commits.txt index 504327dc..a0ec872a 100644 --- a/.github/build_operators_commits.txt +++ b/.github/build_operators_commits.txt @@ -3,3 +3,5 @@ fefa826 d66bcef 0a965ba ba6bebd +378ad3b +926ba78 diff --git a/.github/workflows/build_operators.yaml b/.github/workflows/build_operators.yaml index fe1c8b47..7d36ea19 100644 --- a/.github/workflows/build_operators.yaml +++ b/.github/workflows/build_operators.yaml @@ -3,8 +3,6 @@ name: Build and push CLAIMED operators on: push: branches: [ "main" ] - pull_request: - branches: [ "main" ] env: repository: docker.io/claimed diff --git a/component-library/util/util-cos.ipynb b/component-library/util/old_util-cos.ipynb similarity index 100% rename from component-library/util/util-cos.ipynb rename to component-library/util/old_util-cos.ipynb diff --git a/component-library/util/util-cos.py b/component-library/util/util-cos.py new file mode 100644 index 00000000..e2d691d5 --- /dev/null +++ b/component-library/util/util-cos.py @@ -0,0 +1,135 @@ +""" +COS utility functions +""" + +# pip install aiobotocore botocore s3fs claimed-c3 tqdm + +import os +import s3fs +import logging +import tqdm +from c3.operator_utils import explode_connection_string + +# cos_connection in format: [cos|s3]://access_key_id:secret_access_key@endpoint/bucket/path +cos_connection = os.environ.get('cos_connection', None) + +# access key id (if cos_connection is not provided) +access_key_id = os.environ.get('access_key_id', None) + +# secret access key (if cos_connection is not provided) +secret_access_key = os.environ.get('secret_access_key', None) + +# cos/s3 endpoint (if cos_connection is not provided) +endpoint = os.environ.get('endpoint', None) + +# cos bucket name (if cos_connection is not provided) +bucket_name = os.environ.get('bucket_name', None) + +# cos path (if cos_connection is not provided) +cos_path = os.environ.get('cos_path', None) + +# local path +local_path = os.environ.get('local_path') + +# recursive +recursive = bool(os.environ.get('recursive', 'True')) + +# operation (mkdir|ls|find|download|upload|rm|sync_to_cos|sync_to_local|glob) +operation = os.environ.get('operation') + +# Extract values from connection string +if cos_connection is not None: + (access_key_id, secret_access_key, endpoint, cos_path) = explode_connection_string(cos_connection) +else: + cos_path = os.path.join(bucket_name, cos_path) + +assert access_key_id is not None and secret_access_key is not None and endpoint is not None and cos_path is not None, \ + "Provide a cos_connection (s3://access_key_id:secret_access_key@endpoint/bucket/path) or each value separatly." + + +def main(): + def print_list(l): + for file in l: + print(file) + + s3 = s3fs.S3FileSystem( + anon=False, + key=access_key_id, + secret=secret_access_key, + client_kwargs={'endpoint_url': endpoint} + ) + + if operation == 'mkdir': + logging.info('Make directory ' + cos_path) + s3.mkdir(cos_path) + elif operation == 'ls': + logging.info('List path ' + cos_path) + print_list(s3.ls(cos_path)) + elif operation == 'find': + logging.info('Find path ' + cos_path) + print_list(s3.find(cos_path)) + elif operation == 'upload' and not recursive: + logging.info('Put path ' + cos_path) + print(s3.put(local_path,cos_path)) + elif operation == 'download' and not recursive: + logging.info('Get path ' + cos_path) + s3.get(cos_path, local_path) + elif operation == 'rm': + logging.info('Remove path ' + cos_path) + s3.rm(cos_path, recursive=recursive) + elif operation == 'glob': + logging.info('Glob path ' + cos_path) + print_list(s3.glob(cos_path)) + elif operation == 'sync_to_cos' or operation == 'upload': + logging.info(f'{operation} {local_path} to {cos_path}') + for root, dirs, files in os.walk(local_path, topdown=False): + # Sync files in current folder + for name in tqdm.tqdm(files, desc=root): + file = os.path.join(root, name) + logging.debug(f'processing {file}') + cos_file = os.path.join(cos_path, + os.path.relpath(root, local_path), name).replace('/./', '/') + if operation == 'sync_to_cos' and s3.exists(cos_file): + logging.debug(f'exists {cos_file}') + logging.debug(f's3.info {s3.info(cos_file)}') + if s3.info(cos_file)['size'] != os.path.getsize(file): + logging.debug(f'uploading {file} to {cos_file}') + s3.put(file, cos_file) + else: + logging.debug(f'skipping {file}') + else: + logging.debug(f'uploading {file} to {cos_file}') + s3.put(file, cos_file) + elif operation == 'sync_to_local' or operation == 'download': + logging.info(f'{operation} {cos_path} to {local_path}') + for root, dirs, files in s3.walk(cos_path): + # Sync directories in current folder + for name in dirs: + local_dir = os.path.join(local_path, os.path.relpath(root, cos_path), + name).replace('/./', '/') + if not os.path.isdir(local_dir): + logging.debug(f'create dir {local_dir}') + os.makedirs(local_dir, exist_ok=True, parents=True) + # Sync files in current folder + for name in tqdm.tqdm(files, desc=root): + cos_file = os.path.join(root, name) + local_file = os.path.join(local_path, os.path.relpath(root, cos_path), + name).replace('/./', '/') + logging.debug(f'processing {cos_file}') + if operation == 'sync_to_local' and os.path.isfile(local_file): + logging.debug(f'exists {local_file}') + logging.debug(f's3.info {s3.info(cos_file)}') + if s3.info(cos_file)['size'] != os.path.getsize(local_file): + logging.debug(f'downloading {cos_file} to {local_file}') + s3.get(cos_file, local_file) + else: + logging.info(f'Skipping {cos_file}') + else: + logging.debug(f'downloading {cos_file} to {local_file}') + s3.get(cos_file, local_file) + else: + logging.error(f'Operation unkonwn {operation}') + + +if __name__ == '__main__': + main() diff --git a/operators/dummy/dummy.cfg b/operators/dummy/dummy.cfg deleted file mode 100644 index 2bb55e7d..00000000 --- a/operators/dummy/dummy.cfg +++ /dev/null @@ -1,5 +0,0 @@ -gridwrapper=false -cos=false -repository=false -version=0.2 -additional_files=false diff --git a/operators/dummy/dummy.job.yaml b/operators/dummy/dummy.job.yaml deleted file mode 100644 index 742c17b8..00000000 --- a/operators/dummy/dummy.job.yaml +++ /dev/null @@ -1,23 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: dummy -spec: - template: - spec: - containers: - - name: dummy - image: docker.io/blumenstiel/claimed-dummy:0.2 - command: ["/opt/app-root/bin/python","/opt/app-root/src/dummy.py"] - env: - - name: log_level - value: value_of_log_level - - name: seconds - value: value_of_seconds - - name: shcode - value: value_of_shcode - - name: pycode - value: value_of_pycode - restartPolicy: OnFailure - imagePullSecrets: - - name: image_pull_secret \ No newline at end of file diff --git a/operators/dummy/dummy.py b/operators/dummy/dummy.py deleted file mode 100644 index 4117807f..00000000 --- a/operators/dummy/dummy.py +++ /dev/null @@ -1,30 +0,0 @@ -""" -This operator sleeps for a specified time. -The idle pod can be used for testing code and accessing the terminal. -""" - -import os -import logging -import time - -# number of seconds to sleep (default: 600) -seconds= int(os.getenv('seconds', 600)) - -# Optional shell script to be executed before sleep. -shcode = os.getenv('shcode', None) - -# Optional python code to be executed before sleep. -pycode = os.getenv('pycode', None) - - -if __name__ == '__main__': - if shcode is not None: - logging.info('Execute shell script:\n' + shcode) - os.system(shcode) - - if pycode is not None: - logging.info('Execute python code:\n' + pycode) - exec(pycode) - - logging.info(f'Sleeps for {seconds / 60:.1f} minutes.') - time.sleep(seconds) diff --git a/operators/dummy/dummy.yaml b/operators/dummy/dummy.yaml deleted file mode 100644 index 4cce5127..00000000 --- a/operators/dummy/dummy.yaml +++ /dev/null @@ -1,25 +0,0 @@ -name: dummy -description: "This operator sleeps for a specified time. The idle pod can be used for testing code and accessing the terminal. – CLAIMED V0.1" - -inputs: -- {name: log_level, type: String, description: "update log level", default: "INFO"} -- {name: seconds, type: Integer, description: "number of seconds to sleep (default: 600)", default: "600"} -- {name: shcode, type: String, description: "Optional shell script to be executed before sleep.", default: "None"} -- {name: pycode, type: String, description: "Optional python code to be executed before sleep.", default: "None"} - - -outputs: - - -implementation: - container: - image: docker.io/blumenstiel/claimed-dummy:0.2 - command: - - sh - - -ec - - | - python ./dummy.py log_level="${0}" seconds="${1}" shcode="${2}" pycode="${3}" - - {inputValue: log_level} - - {inputValue: seconds} - - {inputValue: shcode} - - {inputValue: pycode}