diff --git a/.flake8 b/.flake8 index 0baa625..7462d1d 100644 --- a/.flake8 +++ b/.flake8 @@ -7,4 +7,5 @@ exclude = .git,.pycache,build,.eggs per-file-ignores = ./src/actinia_parallel_plugin/wsgi.py: F401 - ./tests/test_resource_base.py: F401 + ./tests/test_resource_base.py: F401 + ./src/actinia_parallel_plugin/core/persistent_processing.py: E501 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c186454..9a893dc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,6 +9,29 @@ on: jobs: + integration-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Start containers + run: docker-compose -f "docker/docker-compose-test.yml" up -d --build + + - name: List running docker + run: docker ps + + - name: Docker logs actinia + run: docker logs docker_actinia-test_1 + + - name: Docker logs postgis + run: docker logs docker_postgis-test_1 + + - name: Run integration test + run: docker exec -t docker_actinia-test_1 sh /usr/bin/run_integration_tests.sh + + - name: Stop containers + run: docker-compose -f "docker/docker-compose-test.yml" down + unittests: runs-on: ubuntu-latest steps: @@ -31,26 +54,3 @@ jobs: file: docker/actinia-parallel-plugin-test/Dockerfile no-cache: true # pull: true - - integration-tests: - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v2 - # with: - # path: "." - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v1 - - name: Replace run integration test command - run: | - sed -i "s+# RUN make test+RUN make integrationtest+g" docker/actinia-parallel-plugin-test/Dockerfile - - name: Integration tests of actinia-parallel-plugin - id: docker_build - uses: docker/build-push-action@v2 - with: - push: false - tags: actinia-parallel-plugin-test:alpine - context: . - file: docker/actinia-parallel-plugin-test/Dockerfile - no-cache: true - # pull: true diff --git a/.gitignore b/.gitignore index c45eb44..67ac5c7 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ __pycache__/* .cache/* .*.swp */.ipynb_checkpoints/* +.coverage.* # Project files .ropeproject diff --git a/Makefile b/Makefile index 4facc69..0da753a 100644 --- a/Makefile +++ b/Makefile @@ -19,13 +19,13 @@ dist: python3 setup.py dist test: - ./tests_with_redis.sh + python3 setup.py test unittest: python3 setup.py test --addopts "-m unittest" devtest: - ./tests_with_redis.sh dev + python3 setup.py test --addopts "-m 'dev'" integrationtest: - ./tests_with_redis.sh integrationtest + python3 setup.py test --addopts "-m 'integrationtest'" diff --git a/README.md b/README.md index af1e134..447a9d6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # actinia-parallel-plugin -This is an example plugin for [actinia-core](https://github.com/mundialis/actinia_core) which adds a "Hello World" endpoint to actinia-core. +This is the actinia-parallel-plugin for [actinia-core](https://github.com/mundialis/actinia_core) which adds parallel processing endpoints to actinia. You can run actinia-parallel-plugin as an actinia-core plugin. @@ -20,14 +20,6 @@ docker network prune docker-compose -f docker/docker-compose.yml up -d ``` -### Requesting helloworld endpoint -You can test the plugin and request the `/helloworld` endpoint, e.g. with: -``` -curl -u actinia-gdi:actinia-gdi -X GET http://localhost:8088/api/v3/processing_parallel | jq - -curl -u actinia-gdi:actinia-gdi -H 'accept: application/json' -H 'Content-Type: application/json' -X POST http://localhost:8088/api/v3/processing_parallel -d '{"name": "test"}' | jq -``` - ## DEV setup For a DEV setup you can use the docker/docker-compose.yml: ``` @@ -38,6 +30,15 @@ docker-compose -f docker/docker-compose.yml run --rm --service-ports --entrypoin (cd /src/actinia-parallel-plugin && python3 setup.py install) # start actinia-core with your plugin gunicorn -b 0.0.0.0:8088 -w 1 --access-logfile=- -k gthread actinia_core.main:flask_app + +# or for debugging in one line with reset +reset && (cd /src/actinia-parallel-plugin && python3 setup.py install) && gunicorn -b 0.0.0.0:8088 -w 3 --access-logfile=- -k gthread actinia_core.main:flask_app +``` + +### PostGIS +Connect to PostGIS DB from actinia-core docker container: +``` +psql -U actinia -h postgis -d gis ``` ### Hints @@ -60,10 +61,11 @@ rm -rf /usr/lib/python3.8/site-packages/actinia_parallel_plugin.wsgi-*.egg You can run the tests in the actinia test docker: ``` -docker build -f docker/actinia-parallel-plugin-test/Dockerfile -t actinia-parallel-plugin-test . -docker run -it actinia-parallel-plugin-test -i +docker-compose -f docker/docker-compose-test.yml build +docker-compose -f docker/docker-compose-test.yml up -d -cd /src/actinia-parallel-plugin/ +# exec docker and run tests manually +docker exec -it docker_actinia-test_1 sh # run all tests make test @@ -73,8 +75,47 @@ make unittest # run only integrationtests make integrationtest -# run only tests which are marked for development with the decorator '@pytest.mark.dev' -make devtest +# or run tests outside of docker container +docker exec -it docker_actinia-test_1 sh /usr/bin/run_integration_tests.sh +docker exec -it docker_actinia-test_1 sh /usr/bin/run_unittests.sh + +docker-compose -f docker/docker-compose-test.yml down +``` + +You can also run the tests in the GHA workflows locally via [act](https://github.com/nektos/act). +To run docker-compose inside a workflow [act_base](https://github.com/lucasctrl/act_base) can be used. +With these you can run the following to run the tests: +``` +# list all workflows +act -l + +# run workflow +act -j integration-tests -P ubuntu-latest=lucasalt/act_base:latest +act -j unittests -P ubuntu-latest=lucasalt/act_base:latest +``` + + +## Examples + +### Requesting batch job and job endpoints +``` +# request batch job +curl -u actinia-gdi:actinia-gdi -X GET http://localhost:8088/api/v3/resources/actinia-gdi/batches/1 | jq +# request job +curl -u actinia-gdi:actinia-gdi -X GET http://localhost:8088/api/v3/resources/actinia-gdi/batches/1/jobs/1 | jq +``` + +### Start parallel batch job +#### Ephemeral processing +You can start a parallel **ephemeral** batch job via: +``` +# parallel ephemeral processing +curl -u actinia-gdi:actinia-gdi -X POST -H 'Content-Type: application/json' -d @test_postbodies/parallel_ephemeral_processing.json http://localhost:8088/api/v3/locations/nc_spm_08_grass7_root/processing_parallel | jq ``` +Attention: +* The individual process chains must be "independent" of each other, since + createBatch is designed as an ephemeral process. -## +TODOs: +* exporter in PC +* using stdout/export in PC of next block diff --git a/docker/actinia-parallel-plugin-test/Dockerfile b/docker/actinia-parallel-plugin-test/Dockerfile index a821e7d..1c86da7 100644 --- a/docker/actinia-parallel-plugin-test/Dockerfile +++ b/docker/actinia-parallel-plugin-test/Dockerfile @@ -12,17 +12,14 @@ ENV DEFAULT_CONFIG_PATH /etc/default/actinia-parallel-plugin-test RUN apk add redis RUN pip3 install iniconfig colorlog pwgen -# COPY docker/actinia-parallel-plugin-test/start.sh /src/start.sh - ENTRYPOINT ["/bin/sh"] CMD ["/src/start.sh"] -# # add data for tests -# RUN wget --quiet https://grass.osgeo.org/sampledata/north_carolina/nc_spm_08_micro.zip && \ -# unzip nc_spm_08_micro.zip && \ -# rm -f nc_spm_08_micro.zip && \ -# mv nc_spm_08_micro /actinia_core/grassdb/nc_spm_08 -# RUN grass -e -c 'EPSG:4326' /actinia_core/grassdb/latlong_wgs84 +# add data for tests +RUN wget --quiet https://grass.osgeo.org/sampledata/north_carolina/nc_spm_08_micro.zip && \ + unzip nc_spm_08_micro.zip && \ + rm -f nc_spm_08_micro.zip && \ + mv nc_spm_08_micro /actinia_core/grassdb/nc_spm_08 # copy needed files and configs for test COPY docker/actinia-parallel-plugin-test/actinia-parallel-plugin-test.cfg /etc/default/actinia @@ -31,7 +28,13 @@ COPY . /src/actinia-parallel-plugin/ WORKDIR /src/actinia-parallel-plugin/ -RUN chmod a+x tests_with_redis.sh RUN make install +ENV SETUPTOOLS_SCM_PRETEND_VERSION=0.0 + +COPY docker/actinia-parallel-plugin-test/run_integration_tests.sh /usr/bin/run_integration_tests.sh +RUN chmod a+x /usr/bin/run_integration_tests.sh +COPY docker/actinia-parallel-plugin-test/run_unittests.sh /usr/bin/run_unittests.sh +RUN chmod a+x /usr/bin/run_unittests.sh + # RUN make test diff --git a/docker/actinia-parallel-plugin-test/actinia-parallel-plugin-test.cfg b/docker/actinia-parallel-plugin-test/actinia-parallel-plugin-test.cfg index e41b90f..98cb525 100644 --- a/docker/actinia-parallel-plugin-test/actinia-parallel-plugin-test.cfg +++ b/docker/actinia-parallel-plugin-test/actinia-parallel-plugin-test.cfg @@ -3,7 +3,7 @@ grass_database = /actinia_core/grassdb grass_user_database = /actinia_core/userdata grass_tmp_database = /actinia_core/workspace/temp_db grass_resource_dir = /actinia_core/resources -grass_gis_base = /usr/local/grass80 +grass_gis_base = /usr/local/grass grass_gis_start_script = /usr/local/bin/grass grass_addon_path = /root/.grass8/addons/ @@ -12,7 +12,8 @@ plugins = ["actinia_parallel_plugin"] force_https_urls = True [REDIS] -redis_server_url = localhost +redis_server_url = redis-test +redis_server_pw = pass redis_server_port = 6379 worker_queue_name = actinia_job worker_logfile = /actinia_core/workspace/tmp/actinia_worker_test.log @@ -22,3 +23,12 @@ tmp_workdir = /actinia_core/workspace/tmp download_cache = /actinia_core/workspace/download_cache secret_key = token_signing_key_changeme save_interim_results = False + +[JOBTABLE] +host = postgis-test +port = 5432 +database = gis +user = actinia +schema = actinia +table = tab_jobs +id_field = id diff --git a/docker/actinia-parallel-plugin-test/run_integration_tests.sh b/docker/actinia-parallel-plugin-test/run_integration_tests.sh new file mode 100644 index 0000000..2073cff --- /dev/null +++ b/docker/actinia-parallel-plugin-test/run_integration_tests.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +python3 setup.py test --addopts "-m 'integrationtest'" diff --git a/docker/actinia-parallel-plugin-test/run_unittests.sh b/docker/actinia-parallel-plugin-test/run_unittests.sh new file mode 100644 index 0000000..1523ade --- /dev/null +++ b/docker/actinia-parallel-plugin-test/run_unittests.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +python3 setup.py test --addopts "-m 'unittest'" diff --git a/docker/actinia.cfg b/docker/actinia.cfg index 10079cc..54d7055 100644 --- a/docker/actinia.cfg +++ b/docker/actinia.cfg @@ -3,7 +3,7 @@ grass_database = /actinia_core/grassdb grass_user_database = /actinia_core/userdata grass_tmp_database = /actinia_core/workspace/temp_db grass_resource_dir = /actinia_core/resources -grass_gis_base = /usr/local/grass80 +grass_gis_base = /usr/local/grass grass_gis_start_script = /usr/local/bin/grass grass_addon_path = /root/.grass8/addons/ @@ -26,3 +26,12 @@ tmp_workdir = /actinia_core/workspace/tmp download_cache = /actinia_core/workspace/download_cache secret_key = token_signing_key_changeme save_interim_results = True + +[JOBTABLE] +host = postgis +port = 5432 +database = gis +user = actinia +schema = actinia +table = tab_jobs +id_field = id diff --git a/docker/docker-compose-test.yml b/docker/docker-compose-test.yml new file mode 100644 index 0000000..2e754a4 --- /dev/null +++ b/docker/docker-compose-test.yml @@ -0,0 +1,58 @@ +version: "3" +services: + + actinia-test: + build: + context: .. + dockerfile: docker/actinia-parallel-plugin-test/Dockerfile + volumes: + - ..:/src/actinia-parallel-plugin/. + ports: + - "8088:8088" + environment: + - JOBTABLE_PW=actinia + depends_on: + - redis-test + - postgis-test + cap_add: + - SYS_PTRACE + networks: + - actinia-test + + redis-test: + image: redis:5.0.4-alpine + volumes: + - ./redis_data:/data + environment: + - REDIS_PASS_FILE=/data/config/.redis + command: [ + "sh", "-c", + ' + docker-entrypoint.sh + "/data/config/redis.conf" + --requirepass "$$(cat $$REDIS_PASS_FILE)" + ' + ] + ports: + - "6379:6379" + networks: + - actinia-test + + postgis-test: + image: postgis/postgis:14-3.2-alpine + # network_mode: host + environment: + POSTGRES_USER: actinia + POSTGRES_PASSWORD: actinia + volumes: + - ./postgresql_init_data:/docker-entrypoint-initdb.d + # - ./postgresql_data:/var/lib/postgresql/data:Z + networks: + - actinia-test + + +networks: + actinia-test: + ipam: + config: + - subnet: 172.18.0.0/26 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 9fff8da..f689df4 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -7,10 +7,14 @@ services: dockerfile: docker/Dockerfile volumes: - ..:/src/actinia-parallel-plugin/. + # - /home/.../grassdata/:/actinia_core/grassdb/ ports: - "8088:8088" + environment: + - JOBTABLE_PW=actinia depends_on: - redis + - postgis cap_add: - SYS_PTRACE networks: @@ -35,6 +39,19 @@ services: networks: - actinia + postgis: + image: postgis/postgis:14-3.2-alpine + # network_mode: host + environment: + POSTGRES_USER: actinia + POSTGRES_PASSWORD: actinia + volumes: + - ./postgresql_init_data:/docker-entrypoint-initdb.d + - ./postgresql_data:/var/lib/postgresql/data:Z + networks: + - actinia + + networks: actinia: ipam: diff --git a/docker/postgresql_init_data/gis.sql b/docker/postgresql_init_data/gis.sql new file mode 100644 index 0000000..2838647 --- /dev/null +++ b/docker/postgresql_init_data/gis.sql @@ -0,0 +1,66 @@ +-- +-- PostgreSQL database dump +-- + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET client_min_messages = warning; +SET row_security = off; + +-- +-- TOC entry 3593 (class 1262 OID 16386) +-- Name: gis; Type: DATABASE; Schema: -; Owner: - +-- + +CREATE DATABASE gis WITH TEMPLATE = template0 ENCODING = 'UTF8' LC_COLLATE = 'en_US.utf8' LC_CTYPE = 'en_US.utf8'; + + +\connect gis + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET client_min_messages = warning; +SET row_security = off; + +-- +-- TOC entry 7 (class 2615 OID 53605) +-- Name: actinia; Type: SCHEMA; Schema: -; Owner: - +-- + +CREATE SCHEMA actinia; + +-- +-- TOC entry 1 (class 3079 OID 12390) +-- Name: plpgsql; Type: EXTENSION; Schema: -; Owner: - +-- + +CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog; + +-- +-- TOC entry 3595 (class 0 OID 0) +-- Dependencies: 1 +-- Name: EXTENSION plpgsql; Type: COMMENT; Schema: -; Owner: - +-- + +COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'; + + +-- +-- TOC entry 2 (class 3079 OID 34955) +-- Name: postgis; Type: EXTENSION; Schema: -; Owner: - +-- + + +SET default_tablespace = ''; + +SET default_with_oids = false; diff --git a/requirements.txt b/requirements.txt index 3ce41f4..3483f5a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,10 @@ # numpy==1.13.3 # scipy==1.0 # + + +python-json-logger==0.1.11 +peewee +yoyo-migrations +psycopg2 +jsonmodels diff --git a/setup.cfg b/setup.cfg index e1b837e..1fe522b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,7 @@ name = actinia_parallel_plugin.wsgi description = actinia example plugin author = Anika Weinmann author-email = aweinmann@mundialis.de -license = mit +license = GPL3 long-description = file: README.md long-description-content-type = text/x-rst; charset=UTF-8 url = https://github.com/pyscaffold/pyscaffold/ @@ -36,6 +36,9 @@ setup_requires = pyscaffold>=3.2a0,<3.3a0 # Require a specific Python version, e.g. Python 2.7 or >= 3.4 # python_requires = >=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.* +[options.package_data] +* = **/*.json + [options.packages.find] where = src exclude = @@ -73,7 +76,7 @@ extras = True # in order to write a coverage file that can be read by Jenkins. addopts = --cov actinia_parallel_plugin --cov-report term-missing - --verbose --tb=line -x + --verbose --tb=line -x -s norecursedirs = dist build diff --git a/setup.py b/setup.py index b16e95f..d32e672 100644 --- a/setup.py +++ b/setup.py @@ -8,6 +8,7 @@ Learn more under: https://pyscaffold.org/ """ import sys +from pathlib import Path from pkg_resources import VersionConflict, require from setuptools import setup @@ -20,4 +21,11 @@ if __name__ == "__main__": - setup(use_pyscaffold=True) + + parent_dir = Path(__file__).resolve().parent + + setup( + use_pyscaffold=True, + install_requires=parent_dir.joinpath( + "requirements.txt").read_text().splitlines(), + ) diff --git a/src/actinia_parallel_plugin/api/batch.py b/src/actinia_parallel_plugin/api/batch.py new file mode 100644 index 0000000..602f62e --- /dev/null +++ b/src/actinia_parallel_plugin/api/batch.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2021-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Endpoint definitions for batch job +""" + +__license__ = "GPLv3" +__author__ = "Julia Haas, Guido Riembauer, Anika Weinmann" +__copyright__ = "Copyright 2021-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +from flask_restful_swagger_2 import swagger +from flask import make_response, jsonify + +from actinia_core.models.response_models import \ + SimpleResponseModel +from actinia_core.rest.resource_management import ResourceManagerBase + +from actinia_parallel_plugin.resources.logging import log +from actinia_parallel_plugin.core.batches import ( + createBatchResponseDict, + getJobsByBatchId, +) +from actinia_parallel_plugin.apidocs import batch + + +class BatchJobsId(ResourceManagerBase): + """ Definition for endpoint + @app.route('/resources//batches/') + + Contains HTTP GET endpoint + Contains HTTP POST endpoint + Contains swagger documentation + """ + @swagger.doc(batch.batchjobId_get_docs) + def get(self, user_id, batchid): + """Get the status of a batch.""" + + ret = self.check_permissions(user_id=user_id) + if ret: + return ret + + if batchid is None: + return make_response("No batchid was given", 400) + + log.info(("\n Received HTTP GET request for batch" + f" with id {str(batchid)}")) + + jobs = getJobsByBatchId(batchid) + if len(jobs) == 0: + res = (jsonify(SimpleResponseModel( + status=404, + message='Either batchid does not exist or there was a ' + 'connection problem to the database. Please ' + 'try again later.' + ))) + return make_response(res, 404) + else: + resp_dict = createBatchResponseDict(jobs) + return make_response(jsonify(resp_dict), 200) + + # no docs because 405 + def post(self, user_id, batchid): + res = jsonify(SimpleResponseModel( + status=405, + message="Method Not Allowed" + )) + return make_response(res, 405) diff --git a/src/actinia_parallel_plugin/api/job.py b/src/actinia_parallel_plugin/api/job.py new file mode 100644 index 0000000..c6a0013 --- /dev/null +++ b/src/actinia_parallel_plugin/api/job.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2018-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Endpoint definitions for job +""" + +__license__ = "GPLv3" +__author__ = "Carmen Tawalika, Anika Weinmann" +__copyright__ = "Copyright 2018-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +from flask_restful_swagger_2 import swagger +from flask import make_response, jsonify + +from actinia_core.rest.resource_management import ResourceManagerBase +from actinia_core.models.response_models import \ + SimpleResponseModel + +from actinia_parallel_plugin.resources.logging import log +from actinia_parallel_plugin.core.jobs import ( + getJob, +) +from actinia_parallel_plugin.apidocs import jobs + + +class JobId(ResourceManagerBase): + """ Definition for endpoint standortsicherung + @app.route('/processing_parallel/jobs/') + + Contains HTTP GET endpoint reading a job + Contains swagger documentation + """ + + @swagger.doc(jobs.jobId_get_docs) + def get(self, user_id, batchid, jobid): + """ Wrapper method to receive HTTP call and pass it to function + + This method is called by HTTP GET + @app.route( + '/resources//batches//jobs/') + This method is calling core method readJob + """ + + ret = self.check_permissions(user_id=user_id) + if ret: + return ret + + if batchid is None: + return make_response("No batchid was given", 400) + + if jobid is None: + return make_response("No jobid was given", 400) + + log.info("\n Received HTTP GET request for job with id " + str(jobid)) + + job, err = getJob(jobid) + + if job is not None: + return make_response(jsonify(job), 200) + else: + res = (jsonify(SimpleResponseModel( + status=err["status"], + message=err["msg"] + ))) + return make_response(res, err["status"]) + + def post(self, user_id, batchid, jobid): + res = jsonify(SimpleResponseModel( + status=405, + message="Method Not Allowed" + )) + return make_response(res, 405) diff --git a/src/actinia_parallel_plugin/api/parallel_ephemeral_processing.py b/src/actinia_parallel_plugin/api/parallel_ephemeral_processing.py new file mode 100644 index 0000000..6067936 --- /dev/null +++ b/src/actinia_parallel_plugin/api/parallel_ephemeral_processing.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Parallel processing +""" + +__license__ = "GPLv3" +__author__ = "Anika Weinmann" +__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +from flask import request, make_response, jsonify, g +from flask_restful_swagger_2 import swagger, Resource + +from actinia_api import URL_PREFIX + +from actinia_core.core.common.app import auth +from actinia_core.core.common.config import global_config +# from actinia_core.core.common.api_logger import log_api_call +from actinia_core.models.response_models import \ + SimpleResponseModel +# from actinia_core.rest.base.user_auth import check_user_permissions +from actinia_core.rest.base.user_auth import create_dummy_user + +from actinia_parallel_plugin.apidocs import batch +from actinia_parallel_plugin.core.batches import ( + createBatch, + createBatchId, + createBatchResponseDict, + getJobsByBatchId, + startProcessingBlock, +) +from actinia_parallel_plugin.resources.logging import log + + +class AsyncParallelEphermeralResource(Resource): + """Resource for parallel processing""" + + decorators = [] + + # if global_config.LOG_API_CALL is True: + # decorators.append(log_api_call) + # + # if global_config.CHECK_CREDENTIALS is True: + # decorators.append(check_user_permissions) + + if global_config.LOGIN_REQUIRED is True: + decorators.append(auth.login_required) + else: + decorators.append(create_dummy_user) + + def __init__(self): + super(AsyncParallelEphermeralResource, self).__init__() + self.location_name = None + self.batch_id = None + + @swagger.doc(batch.batchjobs_post_docs) + def post(self, location_name): + """Persistent parallel processing.""" + + self.location_name = location_name + self.post_url = request.base_url + + json_dict = request.get_json(force=True) + log.info("Received HTTP POST with batchjob: %s" % + str(json_dict)) + + # assign new batchid + self.batch_id = createBatchId() + + # Generate the base of the status URL + host_url = request.host_url + if host_url.endswith("/") and URL_PREFIX.startswith("/"): + self.base_status_url = f"{host_url[:-1]}{URL_PREFIX}/" \ + f"resources/{g.user.user_id}/" + elif not host_url.endswith("/") and not URL_PREFIX.startswith("/"): + self.base_status_url = f"{host_url}/{URL_PREFIX}/resources/" \ + f"{g.user.user_id}/" + else: + self.base_status_url = f"{host_url}{URL_PREFIX}/resources/" \ + f"{g.user.user_id}/" + + # create processing blocks and insert jobs into jobtable + status_url = f"{self.base_status_url}batches/{self.batch_id}" + jobs_in_db = createBatch(json_dict, self.batch_id, status_url) + if jobs_in_db is None: + res = (jsonify(SimpleResponseModel( + status=500, + message=('Error: Batch Processing Chain JSON has no ' + 'jobs.') + ))) + return make_response(res, 500) + + # start first processing block + first_jobs = startProcessingBlock( + jobs_in_db, + 1, + self.batch_id, + self.location_name, + None, # mapset_name + g.user, + request.url, + self.post_url, + request.endpoint, + request.method, + request.path, + self.base_status_url, + "ephemeral" + ) + first_status = [entry["status"] for entry in first_jobs] + all_jobs = getJobsByBatchId(self.batch_id) + if None in first_jobs: + res = (jsonify(SimpleResponseModel( + status=500, + message=('Error: There was a problem starting the ' + 'first jobs of the batchjob.') + ))) + return make_response(res, 500) + elif "ERROR" not in first_status: + return make_response(jsonify(createBatchResponseDict(all_jobs)), + 201) + else: + return make_response(jsonify(createBatchResponseDict(all_jobs)), + 412) diff --git a/src/actinia_parallel_plugin/api/parallel_processing.py b/src/actinia_parallel_plugin/api/parallel_processing.py deleted file mode 100644 index a2a6f77..0000000 --- a/src/actinia_parallel_plugin/api/parallel_processing.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Copyright (c) 2018-present mundialis GmbH & Co. KG - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . - -Parallel processing -""" - -__license__ = "GPLv3" -__author__ = "Anika Weinmann" -__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" -__maintainer__ = "mundialis GmbH % Co. KG" - - -from flask import request, make_response -from flask_restful_swagger_2 import swagger -from flask_restful_swagger_2 import Resource - -from actinia_parallel_plugin.apidocs import helloworld -from actinia_parallel_plugin.model.response_models import ( - SimpleStatusCodeResponseModel, -) -from actinia_parallel_plugin.core.example import transform_input - - -class ParallelProcessingResource(Resource): - """Resource for parallel processing""" - - def __init__(self): - self.msg = "Hello world!" - - # TODO get all batch jobs - @swagger.doc(helloworld.describeHelloWorld_get_docs) - def get(self): - """Get 'Hello world!' as answer string.""" - return SimpleStatusCodeResponseModel(status=200, message=self.msg) - - # TODO start a parallel processing job as batch job - @swagger.doc(helloworld.describeHelloWorld_post_docs) - def post(self): - """Hello World post method with name from postbody.""" - - req_data = request.get_json(force=True) - if isinstance(req_data, dict) is False or "name" not in req_data: - return make_response("Missing name in JSON content", 400) - name = req_data["name"] - msg = transform_input(name) - - return SimpleStatusCodeResponseModel(status=200, message=msg) diff --git a/src/actinia_parallel_plugin/apidocs/batch.py b/src/actinia_parallel_plugin/apidocs/batch.py new file mode 100644 index 0000000..a83d026 --- /dev/null +++ b/src/actinia_parallel_plugin/apidocs/batch.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2021-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Documentation objects for batch endpoints +""" + +__license__ = "GPLv3" +__author__ = "Guido Riembauer, Anika Weinmann" +__copyright__ = "Copyright 2021-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + + +from actinia_core.models.response_models import \ + SimpleResponseModel + +from actinia_parallel_plugin.model.batch import ( + BatchJobResponseModel, + BatchProcessChainModel, +) + + +batchjobId_get_docs = { + "summary": "Returns batchjob by batchid.", + "description": ("This request will get the summary for the requested " + "batchjob and all corresponding jobs from the jobtable."), + "tags": [ + "processing" + ], + "parameters": [ + { + "in": "path", + "name": "batchid", + "type": "string", + "description": "a batchid", + "required": True + } + ], + "responses": { + "200": { + "description": ("The batchjob summary of the requested batchjob " + "and all corresponding jobs"), + "schema": BatchJobResponseModel + }, + "400": { + "description": ("A short error message in case no batchid was " + "provided") + }, + "404": { + "description": ("An error message in case the batchid was " + "not found"), + "schema": SimpleResponseModel + } + } +} + +batchjobs_post_docs = { + "summary": "Creates a new Batchjob from a Batch Processing Chain.", + "description": ("This request will read the json object," + " break it up into parallel processing blocks," + " create individual jobs in the jobtable, " + " and start the jobs in actinia-core depending on" + " their processing block."), + "tags": [ + "processing" + ], + "parameters": [ + { + "in": "body", + "name": "body", + "description": "Batch Processing Chain as json object", + "required": True, + "schema": BatchProcessChainModel + } + ], + "responses": { + "201": { + "description": ("The batchjob summary of the created batchjob and " + "all corresponding jobs"), + "schema": BatchJobResponseModel + }, + "412": { + "description": ("The batchjob summary of the created batchjob and " + "all corresponding jobs in case a job responded " + "with an error"), + "schema": BatchJobResponseModel + }, + "500": { + "description": ("The error message and a detailed log why " + "creating a batchjob failed"), + "schema": SimpleResponseModel + } + } +} diff --git a/src/actinia_parallel_plugin/apidocs/examples/__init__.py b/src/actinia_parallel_plugin/apidocs/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/actinia_parallel_plugin/apidocs/examples/batchjob_post_response_example.json b/src/actinia_parallel_plugin/apidocs/examples/batchjob_post_response_example.json new file mode 100644 index 0000000..999f75b --- /dev/null +++ b/src/actinia_parallel_plugin/apidocs/examples/batchjob_post_response_example.json @@ -0,0 +1,114 @@ +{ + "resource_id": [ + "resource_id-f5c2ec5d-30f0-4f51-be9a-5b423fa1eb18", + "None", + "None", + "None" + ], + "resource_response": { + "resource_id-f5c2ec5d-30f0-4f51-be9a-5b423fa1eb18": { + "accept_datetime": "2022-05-24 18:22:42.343953", + "accept_timestamp": 1653416562.3439503, + "api_info": { + "endpoint": "asyncparallelephermeralresource", + "method": "POST", + "path": "/api/v3/locations/nc_spm_08_grass7_root/processing_parallel", + "post_url": "http://localhost:8088/api/v3/locations/nc_spm_08_grass7_root/processing_parallel", + "request_url": "http://localhost:8088/api/v3/locations/nc_spm_08_grass7_root/processing_parallel" + }, + "datetime": "2022-05-24 18:22:42.346947", + "http_code": 200, + "message": "Resource accepted", + "process_chain_list": [], + "process_results": {}, + "resource_id": "resource_id-f5c2ec5d-30f0-4f51-be9a-5b423fa1eb18", + "status": "accepted", + "time_delta": 0.0030062198638916016, + "timestamp": 1653416562.346946, + "urls": { + "resources": [], + "status": "http://localhost:8088/api/v3/resources/actinia-gdi/resource_id-f5c2ec5d-30f0-4f51-be9a-5b423fa1eb18" + }, + "user_id": "actinia-gdi" + } + }, + "batch_id": 5, + "creation_uuids": [ + "768247dd-8dce-4aa2-bf96-51fb698e4fea", + "9163ca23-aad9-48c6-8eb4-80cc174d2090", + "5a91e046-faec-414d-8013-771747196126", + "ee72667f-248a-4200-8c2c-e6025fa80254" + ], + "id": [ + "17", + "18", + "19", + "20" + ], + "jobs_status": [ + { + "resource_id": "resource_id-f5c2ec5d-30f0-4f51-be9a-5b423fa1eb18", + "id": 17, + "status": "PENDING" + }, + { + "resource_id": "None", + "id": 18, + "status": "PREPARING" + }, + { + "resource_id": "None", + "id": 19, + "status": "PREPARING" + }, + { + "resource_id": "None", + "id": 20, + "status": "PREPARING" + } + ], + "status": "PENDING", + "summary": { + "blocks": [ + { + "accepted": 1, + "block_num": 1, + "error": 0, + "finished": 0, + "parallel": 1, + "preparing": 0, + "running": 0, + "terminated": 0 + }, + { + "accepted": 0, + "block_num": 2, + "error": 0, + "finished": 0, + "parallel": 2, + "preparing": 2, + "running": 0, + "terminated": 0 + }, + { + "accepted": 0, + "block_num": 3, + "error": 0, + "finished": 0, + "parallel": 1, + "preparing": 1, + "running": 0, + "terminated": 0 + } + ], + "status": { + "accepted": 1, + "error": 0, + "finished": 0, + "preparing": 3, + "running": 0, + "terminated": 0 + }, + "total": 4 + } +} diff --git a/src/actinia_parallel_plugin/apidocs/examples/jobs_get_docs_response_example.json b/src/actinia_parallel_plugin/apidocs/examples/jobs_get_docs_response_example.json new file mode 100644 index 0000000..cf7dda6 --- /dev/null +++ b/src/actinia_parallel_plugin/apidocs/examples/jobs_get_docs_response_example.json @@ -0,0 +1,122 @@ +{ + "resource_id": "resource_id-71264d56-4183-4d68-8544-5425254f5def", + "resource_response": { + "accept_datetime": "2022-05-24 18:22:12.481060", + "accept_timestamp": 1653416532.4810588, + "api_info": { + "endpoint": "asyncparallelephermeralresource", + "method": "POST", + "path": "/api/v3/locations/nc_spm_08_grass7_root/processing_parallel", + "post_url": "http://localhost:8088/api/v3/locations/nc_spm_08_grass7_root/processing_parallel", + "request_url": "http://localhost:8088/api/v3/locations/nc_spm_08_grass7_root/processing_parallel" + }, + "datetime": "2022-05-24 18:22:12.990696", + "http_code": 200, + "message": "Processing successfully finished", + "process_chain_list": [ + { + "list": [ + { + "flags": "p", + "id": "g_region_2_parallel_block2", + "inputs": [ + { + "param": "raster", + "value": "elev_ned_30m" + } + ], + "module": "g.region", + "outputs": [] + }, + { + "flags": "g", + "id": "r_univar_2_parallel_block2", + "inputs": [ + { + "param": "map", + "value": "elev_ned_30m" + } + ], + "module": "r.univar", + "outputs": [], + "stdout": { + "delimiter": "=", + "format": "kv", + "id": "stats" + } + } + ], + "version": "1" + } + ], + "process_log": [ + { + "executable": "g.region", + "id": "g_region_2_parallel_block2", + "mapset_size": 407, + "parameter": [ + "raster=elev_ned_30m", + "-p" + ], + "return_code": 0, + "run_time": 0.1003119945526123, + "stderr": [ + "" + ], + "stdout": "projection: 99 (Lambert Conformal Conic)\nzone: 0\ndatum: nad83\nellipsoid: a=6378137 es=0.006694380022900787\nnorth: 228500\nsouth: 215000\nwest: 630000\neast: 645000\nnsres: 30\newres: 30\nrows: 450\ncols: 500\ncells: 225000\n" + }, + { + "executable": "r.univar", + "id": "r_univar_2_parallel_block2", + "mapset_size": 407, + "parameter": [ + "map=elev_ned_30m", + "-g" + ], + "return_code": 0, + "run_time": 0.10025954246520996, + "stderr": [ + "" + ], + "stdout": "n=225000\nnull_cells=0\ncells=225000\nmin=55.1736030578613\nmax=156.386520385742\nrange=101.212917327881\nmean=110.307571087138\nmean_of_abs=110.307571087138\nstddev=20.3119976726962\nvariance=412.577249455617\ncoeff_var=18.4139651272447\nsum=24819203.494606\n" + } + ], + "process_results": { + "stats": { + "cells": "225000", + "coeff_var": "18.4139651272447", + "max": "156.386520385742", + "mean": "110.307571087138", + "mean_of_abs": "110.307571087138", + "min": "55.1736030578613", + "n": "225000", + "null_cells": "0", + "range": "101.212917327881", + "stddev": "20.3119976726962", + "sum": "24819203.494606", + "variance": "412.577249455617" + } + }, + "progress": { + "num_of_steps": 2, + "step": 2 + }, + "resource_id": "resource_id-71264d56-4183-4d68-8544-5425254f5def", + "status": "finished", + "time_delta": 0.5096566677093506, + "timestamp": 1653416532.9906828, + "urls": { + "resources": [], + "status": "http://localhost:8088/api/v3/resources/actinia-gdi/resource_id-71264d56-4183-4d68-8544-5425254f5def" + }, + "user_id": "actinia-gdi" + }, + "batch_id": 3, + "creation_uuid": "49cdd55a-332b-4f66-ad40-c0b3e576f824", + "id": 11, + "status": "SUCCESS", + "time_created": "Tue, 24 May 2022 18:22:09 GMT", + "time_ended": "Tue, 24 May 2022 18:22:12 GMT", + "time_estimated": null, + "time_started": null +} diff --git a/src/actinia_parallel_plugin/apidocs/helloworld.py b/src/actinia_parallel_plugin/apidocs/helloworld.py deleted file mode 100644 index 27d3b2a..0000000 --- a/src/actinia_parallel_plugin/apidocs/helloworld.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Copyright (c) 2018-present mundialis GmbH & Co. KG - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . - -Hello World class -""" - -__license__ = "GPLv3" -__author__ = "Anika Weinmann" -__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" -__maintainer__ = "mundialis GmbH % Co. KG" - - -from actinia_parallel_plugin.model.response_models import ( - SimpleStatusCodeResponseModel, -) - - -describeHelloWorld_get_docs = { - # "summary" is taken from the description of the get method - "tags": ["example"], - "description": "Hello World example", - "responses": { - "200": { - "description": "This response returns the string 'Hello World!'", - "schema": SimpleStatusCodeResponseModel, - } - }, -} - -describeHelloWorld_post_docs = { - # "summary" is taken from the description of the get method - "tags": ["example"], - "description": "Hello World example with name", - "responses": { - "200": { - "description": "This response returns the string 'Hello World " - "NAME!'", - "schema": SimpleStatusCodeResponseModel, - }, - "400": { - "description": "This response returns a detail error message", - "schema": { - "type": "object", - "properties": { - "message": { - "type": "string", - "description": "detailed message", - "example": "Missing name in JSON content", - } - }, - }, - }, - }, -} diff --git a/src/actinia_parallel_plugin/apidocs/jobs.py b/src/actinia_parallel_plugin/apidocs/jobs.py new file mode 100644 index 0000000..2f2779c --- /dev/null +++ b/src/actinia_parallel_plugin/apidocs/jobs.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2018-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Documentation objects for generic job endpoints +""" + +__license__ = "GPLv3" +__author__ = "Carmen Tawalika, Anika Weinmann" +__copyright__ = "Copyright 2018-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +import os +import json +from flask_restful_swagger_2 import Schema + + +script_dir = os.path.dirname(os.path.abspath(__file__)) + +rel_path = "../apidocs/examples/jobs_get_docs_response_example.json" +abs_file_path = os.path.join(script_dir, rel_path) +with open(abs_file_path) as jsonfile: + jobs_get_docs_response_example = json.load(jsonfile) + + +class ProcessesJobResponseModel(Schema): + """Response schema for creating a job""" + type = 'object' + properties = { + 'id': { + 'type': 'integer', + 'description': 'The job ID' + }, + 'time_created': { + 'type': 'string', + 'description': 'Timestamp when job was created' + }, + 'time_started': { + 'type': 'string', + 'description': 'Timestamp when job was created' + }, + 'time_estimated': { + 'type': 'string', + 'description': 'Timestamp when job was created' + }, + 'time_ended': { + 'type': 'string', + 'description': 'Timestamp when job was created' + }, + 'status': { + 'type': 'string', + 'description': 'Status of the Job', + 'enum': [ + "PENDING", + "RUNNING", + "SUCCESS", + "ERROR", + "TERMINATED" + ] + }, + 'resource_response': { + 'type': 'object', + 'description': 'The Response at creation time' + }, + 'resource_id': { + 'type': 'string', + 'description': 'The resource ID for the job' + }, + 'creation_uuid': { + 'type': 'string', + 'description': 'A unique id for the job at creation time before ' + 'id is known. (More unique than creation ' + 'timestamp)' + } + } + example = jobs_get_docs_response_example + + +jobId_get_docs = { + "summary": "Returns job by jobid.", + "description": "This request will get the requested job from the jobtable", + "tags": [ + "processing" + ], + "parameters": [ + { + "in": "path", + "name": "jobid", + "type": "string", + "description": "a jobid", + "required": True + } + ], + "responses": { + "200": { + "description": "The job object of the requested id", + "schema": ProcessesJobResponseModel + } + } +} diff --git a/src/actinia_parallel_plugin/core/batches.py b/src/actinia_parallel_plugin/core/batches.py new file mode 100644 index 0000000..f90f03c --- /dev/null +++ b/src/actinia_parallel_plugin/core/batches.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2021-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Module to communicate with jobtable +""" + +__license__ = "GPLv3" +__author__ = "Guido Riembauer, Anika Weinmann" +__copyright__ = "Copyright 2021-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +from json import loads + +from actinia_parallel_plugin.core.jobs import insertJob +from actinia_parallel_plugin.core.jobtable import getAllIds, getAllJobs +from actinia_parallel_plugin.core.parallel_processing_job import \ + AsyncParallelJobResource +from actinia_parallel_plugin.model.batch_process_chain import ( + BatchProcessChain, +) +from actinia_parallel_plugin.resources.logging import log + + +def assignProcessingBlocks(jsonDict): + """ Function to parse input BPC and split up the joblist according + to the parallel parameter into processing blocks + """ + bpc_dict = checkBatchProcessChain(jsonDict) + + if bpc_dict is None: + return None + else: + # find out individual jobs + jobs = [job for job in bpc_dict["jobs"]] + parallel_jobs = [loads(job["parallel"]) for job in jobs] + # a single parallel job makes no sense so this is corrected here + parallel_jobs_corrected = [] + for idx, job in enumerate(parallel_jobs): + if job is True: + if idx != 0 and idx != len(parallel_jobs)-1: + if (parallel_jobs[idx-1] is False + and parallel_jobs[idx+1] is False): + job = False + jobs[idx]["parallel"] = "false" + elif idx == 0: + if parallel_jobs[idx+1] is False: + job = False + jobs[idx]["parallel"] = "false" + elif idx == len(parallel_jobs)-1: + if parallel_jobs[idx-1] is False: + job = False + jobs[idx]["parallel"] = "false" + parallel_jobs_corrected.append(job) + # determine the different "processing blocks" + block_num = 1 + result_jobs = [] + for idx, job in enumerate(jobs): + parallel = parallel_jobs_corrected[idx] + prev_parallel = parallel_jobs_corrected[idx-1] + if idx > 0 and (parallel is False or prev_parallel is False): + block_num += 1 + job["batch_processing_block"] = block_num + result_jobs.append(job) + return result_jobs + + +# def cancelBatch(batchid): +# """ Function to cancel all jobs that are RUNNING or PENDING +# by batchid +# """ +# jobs = getJobsByBatchId(batchid) +# cancel_jobs = [] +# id_field = JOBTABLE.id_field +# for job in jobs: +# cancel_job = cancelJob(job[id_field]) +# cancel_jobs.append(cancel_job) +# if None not in cancel_jobs: +# # then all jobs already have a resource id etc. +# return cancel_jobs +# else: +# # then there are jobs that have not been posted to actinia yet, +# # where cancelJob returns None only +# return getJobsByBatchId(batchid) + + +def checkBatchProcessChain(jsonDict): + """ Function to test the creation of a BatchProcessChain object from the + input JSON and return + """ + + bpc = BatchProcessChain(**jsonDict) + # bpc.feature_type = "default" + # check consistency + bpc_dict = bpc.to_struct() + if len(bpc_dict["jobs"]) == 0: + log.error('Batch Processing Chain JSON has no jobs!') + return None + return bpc_dict + + +def checkProcessingBlockFinished(jobs, block): + """ Function to check if a certain processing block has finished from + an input list of jobs (db entries) + """ + status_list = [job["status"] for job + in jobs if job["batch_processing_block"] == block] + finished = all(status == "SUCCESS" for status in status_list) + return finished + + +def createBatch(jsonDict, batchid, statusurl): + """ Function to insert all jobs from a batch into the joblist + """ + jobs = assignProcessingBlocks(jsonDict) + if jobs is None: + return None + else: + jobs_in_db = [] + for job in jobs: + job["batch_id"] = batchid + job["urls"] = {"status": statusurl, "resources": []} + # assign the model + job_in_db = insertJob(job) + jobs_in_db.append(job_in_db) + return jobs_in_db + + +def createBatchId(): + """ Function to create a unique BatchId + """ + existing_batch_ids = getAllBatchIds() + if len(existing_batch_ids) == 0: + batch_id = 1 + else: + batch_id = max(existing_batch_ids) + 1 + return batch_id + + +def createBatchResponseDict(jobs_list): + """ Function to create a status response dictionary from an input list of + jobs (db entries) + """ + + # get relevant information for each job + if len(jobs_list) == 0: + return {} + + # sort the jobs according to their id + jobs = sorted(jobs_list, key=lambda d: d["id"]) + batch_id = jobs[0]["batch_id"] + resource_ids = [] + uuids = [] + jobs_status = [] + responses = {} + job_ids = [] + blocks = [] + for job in jobs: + resource_id = job["resource_id"] + # this way we also have "None" if no resource_id is given yet: + resource_ids.append(str(resource_id)) + if resource_id is not None: + responses[resource_id] = job["resource_response"] + job_status = { + "id": job["id"], + "resource_id": str(resource_id), + "status": str(job["status"]) + } + jobs_status.append(job_status) + # status.append(str(job["status"])) + uuids.append(job["creation_uuid"]) + job_ids.append(str(job["id"])) + blocks.append(job["batch_processing_block"]) + + # determine an overall batch status + overall_status_list = [job["status"] for job in jobs_status] + if "ERROR" in overall_status_list: + batch_status = "ERROR" + elif "TERMINATED" in overall_status_list: + if (("RUNNING" in overall_status_list) + or ("PENDING" in overall_status_list)): + batch_status = "TERMINATING" + else: + batch_status = "TERMINATED" + elif all(status == "SUCCESS" for status in overall_status_list): + batch_status = "SUCCESS" + elif all(status == "PREPARING" for status in overall_status_list): + batch_status = "PREPARING" + elif all((status == "PENDING" or status == "PREPARING") for status + in overall_status_list): + batch_status = "PENDING" + else: + batch_status = "RUNNING" + + # create block-wise statistics + batch_processing_blocks = [] + for block in sorted(set(blocks)): + status_list = [job["status"] for job in jobs if + job["batch_processing_block"] == block] + status_dict = _count_status_from_list(status_list) + if len(status_list) > 1: + parallel = len(status_list) + else: + parallel = 1 + + block_info = { + "block_num": block, + "parallel": parallel + } + block_stats = {**block_info, **status_dict} + batch_processing_blocks.append(block_stats) + + # create summary statistics + summary_dict = { + "total": len(job_ids), + "status": _count_status_from_list(overall_status_list), + "blocks": batch_processing_blocks + } + + # create urls + urls = jobs[0]["urls"] + + # create overall response dict + responseDict = { + "batch_id": batch_id, + "resource_id": resource_ids, + "summary": summary_dict, + "resource_response": responses, + "creation_uuids": uuids, + "id": job_ids, + "jobs_status": jobs_status, + "status": batch_status, + "urls": urls, + } + return responseDict + + +def getAllBatchIds(): + """ Function to return all unique batch_ids from the database + """ + batch_ids_raw = set(getAllIds(batch=True)) + batch_ids = sorted([bid for bid in batch_ids_raw if bid is not None]) + return batch_ids + + +# def getAllBatches(): +# """ Function to return all jobs that are part of a batch from the +# database +# """ +# result_list = [] +# batchids = getAllBatchIds() +# for batchid in batchids: +# jobs = getJobsByBatchId(batchid) +# jobs_response = createBatchResponseDict(jobs) +# result_list.append(jobs_response) +# result_dict = {"batch_jobs": result_list} +# return result_dict + + +def getJobsByBatchId(batch_id): + """ Function to return all jobs (db entries) via a batch_id + """ + filter_dict = {"batch_id": batch_id} + jobs = getAllJobs(filter_dict) + return jobs + + +def startProcessingBlock(jobs, block, batch_id, location_name, mapset_name, + user, request_url, post_url, endpoint, method, path, + base_status_url, process): + """ Function to start a specific processing block for an input list of + jobs (db entries) + """ + jobs_to_start = [ + job for job in jobs if job["batch_processing_block"] == block] + jobs_responses = [] + mapset_suffix = "" + if len(jobs_to_start) > 1: + mapset_suffix = "_parallel_" + for num, job in enumerate(jobs_to_start): + process_chain = dict() + process_chain["list"] = job["rule_configuration"]["list"] + process_chain["version"] = job["rule_configuration"]["version"] + jobid = job["id"] + mapset_name_parallel = mapset_name + if mapset_suffix != "" and mapset_name is not None: + mapset_name_parallel += f"{mapset_suffix}{num}" + parallel_job = AsyncParallelJobResource( + user=user, + request_url=request_url, + post_url=post_url, + endpoint=endpoint, + method=method, + path=path, + process_chain=process_chain, + location_name=location_name, + mapset_name=mapset_name_parallel, + batch_id=batch_id, + job_id=jobid, + base_status_url=base_status_url + ) + parallel_job.start_parallel_job(process, block) + job_entry = parallel_job.get_job_entry() + jobs_responses.append(job_entry) + return jobs_responses + + +def _count_status_from_list(input_list): + """ Function to count the occurence of different status strings + from a list + """ + lower_list = [item.lower() for item in input_list] + res_dict = { + "preparing": lower_list.count("preparing"), + "accepted": lower_list.count("pending"), + "running": lower_list.count("running"), + "finished": lower_list.count("success"), + "error": lower_list.count("error"), + "terminated": lower_list.count("terminated") + } + return res_dict diff --git a/src/actinia_parallel_plugin/core/ephemeral_processing.py b/src/actinia_parallel_plugin/core/ephemeral_processing.py new file mode 100644 index 0000000..bef60cf --- /dev/null +++ b/src/actinia_parallel_plugin/core/ephemeral_processing.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Parallel ephemeral processing +""" + +__license__ = "GPLv3" +__author__ = "Anika Weinmann" +__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + + +import pickle + +from actinia_core.processing.actinia_processing.ephemeral_processing import \ + EphemeralProcessing + +from actinia_parallel_plugin.core.batches import ( + checkProcessingBlockFinished, + getJobsByBatchId, + startProcessingBlock, +) +from actinia_parallel_plugin.core.jobs import updateJob + + +class ParallelEphemeralProcessing(EphemeralProcessing): + + def __init__(self, rdc, batch_id, batch_processing_block, jobid, + user, request_url, post_url, endpoint, method, path, + base_status_url): + super(ParallelEphemeralProcessing, self).__init__(rdc) + self.batch_id = batch_id + self.batch_processing_block = batch_processing_block + self.jobid = jobid + self.post_url = post_url + self.user = user + self.request_url = request_url + self.post_url = post_url + self.endpoint = endpoint + self.method = method + self.path = path + self.base_status_url = base_status_url + + def run(self): + super(ParallelEphemeralProcessing, self).run() + self._update_and_check_batch_jobs() + + def _update_and_check_batch_jobs(self): + """Checks batch jobs and starts new batch block if the current block + is successfully finished. + """ + + # update job to finished + resource_id = self.resource_id + response_data = self.resource_logger.get( + self.user_id, self.resource_id) + _, response_model = pickle.loads(response_data) + updateJob(resource_id, response_model, self.jobid) + + if "finished" == response_model["status"]: + jobs_from_batch = getJobsByBatchId(self.batch_id) + all_blocks = [ + job["batch_processing_block"] for job in jobs_from_batch] + block = int(self.batch_processing_block) + block_done = checkProcessingBlockFinished( + jobs_from_batch, block) + if block_done is True and block < max(all_blocks): + next_block = block + 1 + startProcessingBlock( + jobs_from_batch, + next_block, + self.batch_id, + self.location_name, + None, # mapset_name + self.user, + self.request_url, + self.post_url, + self.endpoint, + self.method, + self.path, + self.base_status_url, + "ephemeral" + ) + + elif (response_model["status"] == "error" or + response_model["status"] == "terminated"): + # In this case, nothing happens and the next block is not + # started. + pass + + +def start_job(*args): + processing = ParallelEphemeralProcessing(*args) + processing.run() diff --git a/src/actinia_parallel_plugin/core/example.py b/src/actinia_parallel_plugin/core/example.py deleted file mode 100644 index 55d82e7..0000000 --- a/src/actinia_parallel_plugin/core/example.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Copyright (c) 2018-present mundialis GmbH & Co. KG - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . - -Example core functionality -""" - -__license__ = "GPLv3" -__author__ = "Anika Weinmann" -__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" -__maintainer__ = "mundialis GmbH % Co. KG" - - -def transform_input(inp): - """Example core function""" - out = f"Hello world {inp.upper()}!" - return out diff --git a/src/actinia_parallel_plugin/core/jobs.py b/src/actinia_parallel_plugin/core/jobs.py new file mode 100644 index 0000000..924550f --- /dev/null +++ b/src/actinia_parallel_plugin/core/jobs.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2018-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Functions to communicate with the job db +""" + +__license__ = "GPLv3" +__author__ = "Carmen Tawalika, Anika Weinmann" +__copyright__ = "Copyright 2018-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +from actinia_parallel_plugin.core.jobtable import ( + getJobById, + insertNewJob, + updateJobByID, +) + + +def insertJob(jsonDict): + """ function to prepare and call InsertNewJob""" + + job = insertNewJob(jsonDict) + return job + + +def getJob(jobid): + """ Method to read job from Jobtable by id + + This method can be called by HTTP GET + @app.route('/processing_parallel/jobs/') + """ + + job, err = getJobById(jobid) + + return job, err + + +def shortenActiniaCoreResp(fullResp): + # replace webhook authentication with '***' + if 'process_chain_list' in fullResp: + if len(fullResp['process_chain_list']) > 0: + if 'webhooks' in fullResp['process_chain_list'][0]: + if 'auth' in fullResp['process_chain_list'][0]['webhooks']: + fullResp['process_chain_list'][0]['webhooks']['auth'] = \ + '***:***' + return fullResp + + +def updateJob(resource_id, actinia_resp, jobid): + """ Method to update job in Jobtable + + This method is called by webhook endpoint + """ + + status = actinia_resp["status"] + + # follow-up actinia update, therefore without resourceId + record = updateJobByID( + jobid, + status, + shortenActiniaCoreResp(actinia_resp), + resourceId=resource_id + ) + + return record diff --git a/src/actinia_parallel_plugin/core/jobtable.py b/src/actinia_parallel_plugin/core/jobtable.py new file mode 100644 index 0000000..941918a --- /dev/null +++ b/src/actinia_parallel_plugin/core/jobtable.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2018-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Module to communicate with jobtable +""" + +__license__ = "GPLv3" +__author__ = "Carmen Tawalika, Anika Weinmann" +__copyright__ = "Copyright 2018-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + + +from datetime import datetime + +from playhouse.shortcuts import model_to_dict +from peewee import Expression, AutoField, OperationalError +from time import sleep +from uuid import uuid4 +from yoyo import read_migrations +from yoyo import get_backend + +from actinia_parallel_plugin.model.jobtable import Job, jobdb +from actinia_parallel_plugin.resources.config import JOBTABLE +from actinia_parallel_plugin.resources.logging import log + + +# We used `jobdb.connect(reuse_if_open=True)` at the beginning +# of every method. Now we use `with jobdb:` as described in the +# peewee docs but we still try to jobdb.close() at the end of +# each method. + +def initJobDB(): + """Create jobtable on startup.""" + Job.create_table(safe=True) + log.debug('Created jobtable if not exists') + + +def applyMigrations(): + backend = get_backend( + 'postgres://%s:%s@%s/%s?schema=%s' % + (JOBTABLE.user, JOBTABLE.pw, JOBTABLE.host, JOBTABLE.database, + JOBTABLE.schema)) + migrations = read_migrations( + 'actinia_parallel_plugin/resources/migrations') + + with backend.lock(): + backend.apply_migrations(backend.to_apply(migrations)) + log.debug('Applied migrations.') + + +def getAllIds(batch=False): + """ Method to read all jobs from jobtable + + Args: + batch (bool): indicate whether only batch jobs should be read + + Returns: + jobIds (list): the record matching the id + """ + if batch is True: + field = JOBTABLE.batch_id_field + else: + field = JOBTABLE.id_field + with jobdb: + queryResult = Job.select(getattr(Job, field)).dicts() + + jobIds = [] + + # iterating reopens db connection!! + for i in queryResult: + jobIds.append(i[field]) + + jobdb.close() + + return jobIds + + +def getAllJobs(filters): + """ Method to read all jobs from jobtable with filter + + Args: filters (ImmutableMultiDict): the args from the HTTP call + + Returns: + jobs (list): the records matching the filter + """ + log.debug('Received query for jobs') + + query = None + + if filters: + log.debug("Found filters: " + str(filters)) + keys = [key for key in filters] + + for key in keys: + + try: + getattr(Job, key) + except Exception as e: + log.warning(str(e)) + continue + + log.debug("Filter " + str(key) + + " with value " + str(filters[key])) + + if isinstance(getattr(Job, key), AutoField): + try: + int(filters[key]) + except Exception as e: + log.error(str(e)) + jobdb.close() + return + + try: + # even though operators are listed as == and & in peewee docs, + # for Expression creation use '=' and 'AND'. + exp = Expression(getattr(Job, key), '=', filters[key]) + if query is not None: + query = Expression(query, 'AND', exp) + else: + query = exp + except AttributeError as e: + log.error(str(e)) + + with jobdb: + queryResult = Job.select().where(query).dicts() + + jobs = [] + # iterating reopens db connection!! + for i in queryResult: + jobs.append(i) + + log.info("Found " + str(len(jobs)) + " results for query.") + + jobdb.close() + + return jobs + + +def getJobById(jobid): + """ Method to read job from jobtable by id + + Args: + jobid (int): id of job + + Returns: + record (dict): the record matching the id + """ + try: + with jobdb: + queryResult = Job.select().where( + getattr(Job, JOBTABLE.id_field) == jobid).get() + record = model_to_dict(queryResult) + err = None + except Job.DoesNotExist: + record = None + err = { + "status": 503, + "msg": "Either jobid does not exist or there was a " + "connection problem to the database. Please " + "try again later." + } + except OperationalError: + record = None + err = { + "status": 412, + "msg": "Database connection terminated abnormally before or " + "while processing the request. Please " + "try again later." + } + except Exception: + record = None + err = { + "status": 503, + "msg": "Either jobid does not exist or there was a " + "connection problem to the database. Please " + "try again later." + } + + jobdb.close() + + return record, err + + +def getJobByResource(key, val): + """ Method to read job from jobtable by resource + + Args: + key (string): key of attribute + val (string): value of attribute + + Returns: + record (dict): the record matching the id + """ + try: + with jobdb: + queryResult = Job.select().where( + getattr(Job, key) == val).get() + record = model_to_dict(queryResult) + + except Job.DoesNotExist: + record = None + + jobdb.close() + + return record + + +def insertNewJob( + rule_configuration, + ): + """Insert new job into jobtable. + + Args: + rule_configuration (dict): original regeldatei + + Returns: + record (dict): the new record + + """ + utcnow = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') + + creation_uuid = uuid4() + + job_kwargs = { + 'rule_configuration': rule_configuration, + 'status': 'PREPARING', + 'time_created': utcnow, + 'creation_uuid': creation_uuid + } + if "batch_id" in rule_configuration.keys(): + # then it's a batch job + job_kwargs["batch_processing_block"] = rule_configuration[ + "batch_processing_block"] + job_kwargs["batch_id"] = rule_configuration["batch_id"] + if "urls" in rule_configuration.keys(): + job_kwargs["urls"] = rule_configuration["urls"] + job = Job(**job_kwargs) + + with jobdb: + job.save() + # try to avoid "peewee.InterfaceError: connection already closed" + # so make each connection duration as short as possible + with jobdb: + queryResult = Job.select().where((Job.time_created == utcnow) & ( + Job.creation_uuid == creation_uuid)).get() + + record = model_to_dict(queryResult) + + log.info("Created new job with id " + str(record['id']) + ".") + + jobdb.close() + + return record + + +def updateJobByID(jobid, status, resp, resourceId=None): + """ Method to update job in jobtable when processing status changed + + Args: + jobid (int): the id of the job + status (string): actinia-core processing status + resp (dict): actinia-core response + resourceId (str): actinia-core resourceId + + Returns: + updatedRecord (TODO): the updated record + """ + + if status == 'accepted': + status = 'PENDING' + elif status == 'running': + status = 'RUNNING' + elif status == 'finished': + status = 'SUCCESS' + elif status == 'error': + status = 'ERROR' + elif status == 'terminated': + status = 'TERMINATED' + + utcnow = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') + record = None + while record is None: + record, err = getJobById(jobid) + sleep(1) + dbStatus = record['status'] + + try: + if status == 'PENDING': + if dbStatus == status: + return record + log.debug("Update status to " + status + " for job with id " + + str(record['id']) + ".") + updatekwargs = { + 'status': status, + 'resource_response': resp, + 'resource_id': resourceId + } + + query = Job.update(**updatekwargs).where( + getattr(Job, JOBTABLE.id_field) == jobid + ) + + elif status == 'RUNNING': + updatekwargs = dict() + + if dbStatus == status: + updatekwargs['resource_response'] = resp + + else: + log.debug("Update status to " + status + " for job with id " + + str(record['id']) + ".") + updatekwargs['status'] = status + updatekwargs['resource_response'] = resp + updatekwargs['time_started'] = utcnow + if resourceId is not None: + updatekwargs['resource_id'] = resourceId + # TODO: check if time_estimated can be set + # time_estimated= + + query = Job.update(**updatekwargs).where( + getattr(Job, JOBTABLE.id_field) == jobid + ) + + elif status in ['SUCCESS', 'ERROR', 'TERMINATED']: + log.debug("Update status to " + status + " for job with id " + + str(record['id']) + ".") + updatekwargs = { + 'status': status, + 'resource_response': resp, + 'time_ended': utcnow + } + if resourceId is not None: + updatekwargs['resource_id'] = resourceId + + query = Job.update(**updatekwargs).where( + getattr(Job, JOBTABLE.id_field) == jobid + ) + + else: + log.error('Could not set the status to actinia-core status: ' + + status + '(Status not found.)') + return None + + with jobdb: + query.execute() + queryResult = Job.select().where( + getattr(Job, JOBTABLE.id_field) == jobid).get() + + record = model_to_dict(queryResult) + except Exception as e: + log.error('Could not set the status to actinia-core status: ' + status) + log.error(str(e)) + return None + + jobdb.close() + + return record diff --git a/src/actinia_parallel_plugin/core/parallel_processing_job.py b/src/actinia_parallel_plugin/core/parallel_processing_job.py new file mode 100644 index 0000000..f3d34f6 --- /dev/null +++ b/src/actinia_parallel_plugin/core/parallel_processing_job.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Parallel processing job +""" + +__license__ = "GPLv3" +__author__ = "Anika Weinmann" +__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +import pickle + +from actinia_core.core.common.redis_interface import enqueue_job + +from actinia_parallel_plugin.core.jobtable import getJobById +from actinia_parallel_plugin.core.jobs import updateJob +from actinia_parallel_plugin.resources.logging import log +from actinia_parallel_plugin.core.parallel_resource_base import \ + ParallelResourceBase + + +class AsyncParallelJobResource(ParallelResourceBase): + """Job for parallel processing""" + + def __init__(self, user, request_url, post_url, endpoint, method, path, + process_chain, location_name, mapset_name, + batch_id, job_id, base_status_url): + super(AsyncParallelJobResource, self).__init__( + user=user, + request_url=request_url, + endpoint=endpoint, + method=method, + path=path, + post_url=post_url, + base_status_url=base_status_url + ) + self.location_name = location_name + self.mapset_name = mapset_name + self.batch_id = batch_id + self.job_id = job_id + self.request_data = process_chain + self.post_url = post_url + self.endpoint = endpoint + self.method = method + self.path = path + self.base_status_url = base_status_url + + def start_parallel_job(self, process, block): + """Starting job in running actinia-core instance and update job db.""" + job, err = getJobById(self.job_id) + # TODO prepare_actinia ? + # TODO execute_actinia ? + # TODO goodby_actinia ? + + rdc = self.preprocess( + has_json=False, + location_name=self.location_name, + mapset_name=self.mapset_name + ) + if rdc: + if process == "ephemeral": + from actinia_parallel_plugin.core.ephemeral_processing import \ + start_job + # # for debugging comment enqueue_job(...) and use the + # # following commented lines + # for var in [ + # 'GISRC', 'GISBASE', 'LD_LIBRARY_PATH', + # 'GRASS_ADDON_PATH', 'GIS_LOCK']: + # import os + # if var in os.environ: + # del os.environ[var] + # from actinia_parallel_plugin.core.ephemeral_processing \ + # import ParallelEphemeralProcessing + # processing = ParallelEphemeralProcessing( + # rdc, self.batch_id, block, self.job_id, + # self.user, + # self.request_url, + # self.post_url, + # self.endpoint, + # self.method, + # self.path, + # self.base_status_url) + # processing.run() + # elif process == "persistent": + # from actinia_parallel_plugin.core.persistent_processing \ + # import start_job + # # TODO + # # # for debugging + # # from actinia_parallel_plugin.core.persistent_processing \ + # # import ParallelPersistentProcessing + # # processing = ParallelPersistentProcessing( + # # rdc, self.batch_id, block, self.job_id, + # # self.user, + # # self.request_url, + # # self.post_url, + # # self.endpoint, + # # self.method, + # # self.path, + # # self.base_status_url) + # # processing.run() + else: + msg = f"Process '{process}' not yet supported!" + log.error(msg) + _, response_model = pickle.loads(self.response_data) + response_model["status"] = "error" + response_model["message"] = msg + job = updateJob(self.resource_id, response_model, self.job_id) + return job + enqueue_job( + self.job_timeout, + start_job, + rdc, + self.batch_id, + block, + self.job_id, + self.user, + self.request_url, + self.post_url, + self.endpoint, + self.method, + self.path, + self.base_status_url + ) + + # update job in jobtable + self.response_data = self.resource_logger.get( + self.user_id, self.resource_id, self.iteration) + _, response_model = pickle.loads(self.response_data) + job = updateJob(self.resource_id, response_model, self.job_id) + return job + + def get_job_entry(self): + """Return job entry by requesting jobtable from db.""" + return getJobById(self.job_id)[0] diff --git a/src/actinia_parallel_plugin/core/parallel_resource_base.py b/src/actinia_parallel_plugin/core/parallel_resource_base.py new file mode 100644 index 0000000..a5961b4 --- /dev/null +++ b/src/actinia_parallel_plugin/core/parallel_resource_base.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Parallel processing +""" + +__license__ = "GPLv3" +__author__ = "Anika Weinmann" +__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +import time +import os +from datetime import datetime + +from flask_restful_swagger_2 import Resource + +from actinia_core.core.common.config import global_config +from actinia_core.core.messages_logger import MessageLogger +from actinia_core.core.resources_logger import ResourceLogger +from actinia_core.rest.base.resource_base import ResourceBase +from actinia_core.models.response_models import ( + create_response_from_model, + ApiInfoModel, + ProcessingResponseModel, +) +from actinia_core.core.resource_data_container import ResourceDataContainer + + +class ParallelResourceBase(ResourceBase): + """This is the base class for all asynchronous and synchronous processing + resources. + """ + + def __init__(self, user, request_url, endpoint, method, path, + base_status_url, + resource_id=None, iteration=None, post_url=None): + # Configuration + Resource.__init__(self) + + # Store the user id, user group and all credentials of the current user + self.user = user + self.user_id = user.get_id() + self.user_group = user.get_group() + self.user_role = user.get_role() + self.has_superadmin_role = user.has_superadmin_role() + self.user_credentials = user.get_credentials() + + self.orig_time = time.time() + self.orig_datetime = str(datetime.now()) + + kwargs = dict() + kwargs['host'] = global_config.REDIS_SERVER_URL + kwargs['port'] = global_config.REDIS_SERVER_PORT + if (global_config.REDIS_SERVER_PW and + global_config.REDIS_SERVER_PW is not None): + kwargs['password'] = global_config.REDIS_SERVER_PW + self.resource_logger = ResourceLogger(**kwargs) + del kwargs + + self.message_logger = MessageLogger() + + self.grass_data_base = global_config.GRASS_DATABASE + self.grass_user_data_base = global_config.GRASS_USER_DATABASE + self.grass_base_dir = global_config.GRASS_GIS_BASE + self.grass_start_script = global_config.GRASS_GIS_START_SCRIPT + self.grass_addon_path = global_config.GRASS_ADDON_PATH + self.download_cache = os.path.join( + global_config.DOWNLOAD_CACHE, self.user_id) + + # Set the resource id + if resource_id is None: + # Generate the resource id + self.request_id, self.resource_id = self.generate_uuids() + else: + self.resource_id = resource_id + self.request_id = self.generate_request_id_from_resource_id() + + # set iteration and post_url + self.iteration = iteration + self.post_url = post_url + + # The base URL's for resources that will be streamed + self.resource_url_base = None + + # Generate the status URL + self.status_url = f"{base_status_url}{self.resource_id}" + + if (global_config.FORCE_HTTPS_URLS is True + and "http://" in self.status_url): + self.status_url = self.status_url.replace("http://", "https://") + + self.request_url = request_url + self.resource_url = None + self.request_data = None + self.response_data = None + self.job_timeout = 0 + + # Replace this with the correct response model in subclasses + # The class that is used to create the response + self.response_model_class = ProcessingResponseModel + + # Put API information in the response for later accounting + kwargs = { + 'endpoint': endpoint, + 'method': method, + 'path': path, + 'request_url': self.request_url} + if self.post_url is not None: + kwargs['post_url'] = self.post_url + self.api_info = ApiInfoModel(**kwargs) + + def preprocess(self, has_json=True, has_xml=False, + location_name=None, mapset_name=None, map_name=None): + """Preprocessing steps for asynchronous processing + + - Check if the request has a data field + - Check if the module chain description can be loaded + - Initialize the response and request ids as well as the + url for status polls + - Send an accept entry to the resource redis database + + Args: + has_json (bool): Set True if the request has JSON data, False + otherwise + has_xml (bool): Set True if the request has XML data, False + otherwise + location_name (str): The name of the location to work in + mapset_name (str): The name of the target mapset in which the + computation should be performed + map_name: The name of the map or other resource (raster, vector, + STRDS, color, ...) + + Returns: + The ResourceDataContainer that contains all required information + for the async process or None if the request was wrong. Then use + the self.response_data variable to send a response. + + """ + + # Compute the job timeout of the worker queue from the user credentials + process_time_limit = self.user_credentials["permissions"][ + "process_time_limit"] + process_num_limit = self.user_credentials["permissions"][ + "process_num_limit"] + self.job_timeout = int(process_time_limit * process_num_limit * 20) + + # Create the resource URL base and use a placeholder for the file name + # The placeholder __None__ must be replaced by the resource URL + # generator + self.resource_url_base = f"{self.status_url}/__None__" + + if (global_config.FORCE_HTTPS_URLS is True + and "http://" in self.resource_url_base): + self.resource_url_base = self.resource_url_base.replace( + "http://", "https://") + + # Create the accepted response that will be always send + self.response_data = create_response_from_model( + self.response_model_class, + status="accepted", + user_id=self.user_id, + resource_id=self.resource_id, + iteration=self.iteration, + process_log=None, + results={}, + message="Resource accepted", + http_code=200, + orig_time=self.orig_time, + orig_datetime=self.orig_datetime, + status_url=self.status_url, + api_info=self.api_info) + + # Send the status to the database + self.resource_logger.commit( + self.user_id, self.resource_id, self.iteration, self.response_data) + + # Return the ResourceDataContainer that includes all + # required data for the asynchronous processing + return ResourceDataContainer( + grass_data_base=self.grass_data_base, + grass_user_data_base=self.grass_user_data_base, + grass_base_dir=self.grass_base_dir, + request_data=self.request_data, + user_id=self.user_id, + user_group=self.user_group, + user_credentials=self.user_credentials, + resource_id=self.resource_id, + iteration=self.iteration, + status_url=self.status_url, + api_info=self.api_info, + resource_url_base=self.resource_url_base, + orig_time=self.orig_time, + orig_datetime=self.orig_datetime, + config=global_config, + location_name=location_name, + mapset_name=mapset_name, + map_name=map_name + ) diff --git a/src/actinia_parallel_plugin/endpoints.py b/src/actinia_parallel_plugin/endpoints.py index ed2c485..5eb45ec 100644 --- a/src/actinia_parallel_plugin/endpoints.py +++ b/src/actinia_parallel_plugin/endpoints.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- """ -Copyright (c) 2018-present mundialis GmbH & Co. KG +Copyright (c) 2022 mundialis GmbH & Co. KG This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,8 +25,13 @@ __maintainer__ = "mundialis GmbH % Co. KG" -from actinia_parallel_plugin.api.parallel_processing import \ - ParallelProcessingResource +from actinia_parallel_plugin.api.batch import BatchJobsId +from actinia_parallel_plugin.api.job import JobId +# from actinia_parallel_plugin.api.parallel_processing import \ +# AsyncParallelPersistentResource +from actinia_parallel_plugin.api.parallel_ephemeral_processing import \ + AsyncParallelEphermeralResource +from actinia_parallel_plugin.core.jobtable import initJobDB, applyMigrations # endpoints loaded if run as actinia-core plugin @@ -34,4 +39,33 @@ def create_endpoints(flask_api): apidoc = flask_api - apidoc.add_resource(ParallelProcessingResource, "/processing_parallel") + # POST parallel ephemeral processing + apidoc.add_resource( + AsyncParallelEphermeralResource, + "/locations//processing_parallel") + + # # POST parallel persistent processing + # apidoc.add_resource( + # AsyncParallelPersistentResource, + # "/locations//mapsets/" + # "/processing_parallel") + + # GET batch jobs TODO + # "/resources//batches" + + # GET batch jobs by ID + apidoc.add_resource( + BatchJobsId, + "/resources//batches/") + + # GET all jobs of one batch TODO + # "/resources//batches//jobs" + + # GET job by ID + apidoc.add_resource( + JobId, + "/resources//batches//jobs/") + + # initilalize jobtable + initJobDB() + applyMigrations() diff --git a/src/actinia_parallel_plugin/model/batch.py b/src/actinia_parallel_plugin/model/batch.py new file mode 100644 index 0000000..a649bd3 --- /dev/null +++ b/src/actinia_parallel_plugin/model/batch.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2021-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Model class for batch +""" + +__license__ = "GPLv3" +__author__ = "Guido Riembauer, Anika Weinmann" +__copyright__ = "Copyright 2021-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +import os +import json +from flask_restful_swagger_2 import Schema + +from actinia_core.models.process_chain import ProcessChainModel +from actinia_core.models.response_models import UrlModel + +script_dir = os.path.dirname(os.path.abspath(__file__)) + +rel_path = "../apidocs/examples/batchjob_post_response_example.json" +abs_file_path = os.path.join(script_dir, rel_path) +print(abs_file_path) +with open(abs_file_path) as jsonfile: + batchjob_post_docs_response_example = json.load(jsonfile) + + +class BatchJobsSummaryModel(Schema): + """Schema for a batchjob response summary""" + type = 'object' + properties = { + 'blocks': { + 'type': 'array', + 'description': 'Status summary for each processing blocks', + 'items': { + 'block_num': { + 'type': 'integer', + 'description': 'Number of processing block' + }, + 'parallel': { + 'type': 'integer', + 'description': ('Number of parallel jobs within ' + 'processing block') + }, + 'accepted': { + 'type': 'integer', + 'description': ('Number of jobs with actinia-core status ' + '"ACCEPTED" within block') + }, + 'error': { + 'type': 'integer', + 'description': ('Number of jobs with actinia-core status ' + '"ERROR" within block') + }, + 'finished': { + 'type': 'integer', + 'description': ('Number of jobs with actinia-core status ' + '"FINISHED" within block') + }, + 'preparing': { + 'type': 'integer', + 'description': ('Number of jobs with status ' + '"PREPARING" within block (jobs that ' + 'exist in the jobtable but have not yet ' + 'been posted to actinia-core)') + }, + 'running': { + 'type': 'integer', + 'description': ('Number of jobs with actinia-core status ' + '"RUNNING" within block') + }, + 'terminated': { + 'type': 'integer', + 'description': ('Number of jobs with actinia-core status ' + '"TERMINATED" within block') + }, + } + }, + 'status': { + 'type': 'array', + 'description': 'Status summary for all jobs', + 'items': { + 'accepted': { + 'type': 'integer', + 'description': ('Overall number of jobs with ' + 'actinia-core status "ACCEPTED"') + }, + 'error': { + 'type': 'integer', + 'description': ('Overall number of jobs with ' + 'actinia-core status "ERROR"') + }, + 'finished': { + 'type': 'integer', + 'description': ('Overall number of jobs with ' + 'actinia-core status "FINISHED"') + }, + 'preparing': { + 'type': 'integer', + 'description': ('Overall number of jobs with ' + 'status "PREPARING" (jobs that ' + 'exist in the jobtable but have not yet ' + 'been posted to actinia-core)') + }, + 'running': { + 'type': 'integer', + 'description': ('Overall number of jobs with ' + 'actinia-core status "RUNNING"') + }, + 'terminated': { + 'type': 'integer', + 'description': ('Overall number of jobs with ' + 'actinia-core status "TERMINATED"') + }, + } + }, + 'total': { + 'type': 'integer', + 'description': 'Overall number of jobs within batchjob' + } + } + + +class BatchProcessChainModel(Schema): + """Definition of the actinia-gdi batch process chain that includes several + actinia process chains that can be run in parallel or sequentially + """ + type = 'object' + properties = { + 'jobs': { + 'type': 'array', + 'items': ProcessChainModel, + 'description': "A list of process chains (jobs) that should " + "be executed in parallel or sequentially " + "in the order provided by the list." + } + } + required = ["jobs"] + + +class BatchJobResponseModel(Schema): + """Response schema for creating and requesting the status of a Batchjob + """ + type = 'object' + properties = { + 'resource_id': { + 'type': 'array', + 'description': ('The resource IDs for all individual ' + 'jobs'), + 'items': {'type': 'string'} + }, + 'resource_response': { + 'type': 'array', + 'description': 'The responses of actinia-core for individual jobs', + 'items': {'type': 'object'} + }, + 'batch_id': { + 'type': 'integer', + 'description': 'The batch ID' + }, + 'creation_uuid': { + 'type': 'array', + 'description': ('Unique ids for the individual jobs at creation ' + 'time before id is known. ' + '(More unique than creation timestamp)'), + 'items': {'type': 'string'} + }, + 'id': { + 'type': 'array', + 'description': 'The individual job IDs', + 'items': {'type': 'integer'} + }, + 'status': { + 'type': 'string', + 'description': 'The overall status of the batchjob', + 'enum': [ + "PREPARING", + "PENDING", + "RUNNING", + "SUCCESS", + "ERROR", + "TERMINATING", + "TERMINATED"] + }, + 'jobs_status': { + 'type': 'array', + 'description': ('Status of the individual Jobs by actinia-core ' + 'resource ID and job ID'), + 'items': { + 'actinia_core_job_id': { + 'type': 'string', + 'description': 'The actinia-core resource ID for the job' + }, + 'id': { + 'type': 'integer', + 'description': 'The job ID' + }, + 'status': { + 'type': 'string', + 'description': 'Status of the Job', + 'enum': [ + "PREPARING", + "PENDING", + "RUNNING", + "SUCCESS", + "ERROR", + "TERMINATED" + ] + } + } + }, + 'summary': BatchJobsSummaryModel, + 'urls': UrlModel + } + example = batchjob_post_docs_response_example diff --git a/src/actinia_parallel_plugin/model/batch_process_chain.py b/src/actinia_parallel_plugin/model/batch_process_chain.py new file mode 100644 index 0000000..7932f52 --- /dev/null +++ b/src/actinia_parallel_plugin/model/batch_process_chain.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2021-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Model classes for Batch Process Chain +""" + +__license__ = "GPLv3" +__author__ = "Julia Haas, Guido Riembauer" +__copyright__ = "Copyright 2021-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + + +from jsonmodels import models, fields + + +class ModuleStdOut(models.Base): + """Model for object in BatchProcessChain + + Model for optional stdout in module + """ + id = fields.StringField(required=True) + format = fields.StringField(required=True) + delimiter = fields.StringField(required=True) + + +class ModuleExport(models.Base): + """Model for object in BatchProcessChain + + Model for optional export in output + """ + format = fields.StringField() + type = fields.StringField() + + +class ModuleOutput(models.Base): + """Model for object in BatchProcessChain + + Model for each output in module outputs array + """ + param = fields.StringField() + value = fields.StringField() + export = fields.EmbeddedField(ModuleExport) + + +class ModuleInput(models.Base): + """Model for object in BatchProcessChain + + Model for each input in module inputs array + """ + param = fields.StringField(required=True) + value = fields.StringField(required=True) + + +class Module(models.Base): + """Model for object in BatchProcessChain + + Model for each module in module list array + """ + module = fields.StringField(required=True) # string + id = fields.StringField(required=True) # string + inputs = fields.ListField([ModuleInput]) # array of objects + flags = fields.StringField() # string + stdout = fields.EmbeddedField(ModuleStdOut) # string + outputs = fields.ListField([ModuleOutput]) + + +class Job(models.Base): + """Model for object in BatchProcessChain + + Model for each job in jobs array + """ + version = fields.StringField() # string + parallel = fields.StringField(required=True) # bool + list = fields.ListField([Module], required=True) # array of objects + # the block and batch id is not in the json but is filled later + batch_processing_block = fields.IntField() + batch_id = fields.IntField() + + +class BatchProcessChain(models.Base): + """Model for BatchProcessChain + Including all information for all jobs + This is used by the parallel processing endpoints + """ + + # processing_platform = fields.StringField() # string + # processing_platform_name = fields.StringField() # string + # processing_host = fields.StringField() # string + jobs = fields.ListField([Job], required=True) # array of objects diff --git a/src/actinia_parallel_plugin/model/jobtable.py b/src/actinia_parallel_plugin/model/jobtable.py new file mode 100644 index 0000000..b167026 --- /dev/null +++ b/src/actinia_parallel_plugin/model/jobtable.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2018-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Job model +""" + +__license__ = "GPLv3" +__author__ = "Carmen Tawalika, Anika Weinmann" +__copyright__ = "Copyright 2018-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +from peewee import Model, CharField, DateTimeField, AutoField, IntegerField +from playhouse.postgres_ext import BinaryJSONField +from playhouse.pool import PooledPostgresqlExtDatabase + +from actinia_parallel_plugin.resources.config import JOBTABLE +from actinia_parallel_plugin.resources.logging import log + + +log.debug("Database config loaded: " + JOBTABLE.host + ":" + JOBTABLE.port + + "/" + JOBTABLE.database + "/" + + JOBTABLE.schema + "." + JOBTABLE.table) + + +"""database connection""" + +jobdb = PooledPostgresqlExtDatabase( + JOBTABLE.database, **{ + 'host': JOBTABLE.host, + 'port': JOBTABLE.port, + 'user': JOBTABLE.user, + 'password': JOBTABLE.pw, + 'max_connections': 8, + 'stale_timeout': 300 + } +) + + +class BaseModel(Model): + """Base Model for tables in jobdb + """ + class Meta: + database = jobdb + + +class Job(BaseModel): + """Model for jobtable in database + """ + # behalten + time_created = DateTimeField(null=True) + time_started = DateTimeField(null=True) + time_estimated = DateTimeField(null=True) + time_ended = DateTimeField(null=True) + status = CharField(null=True) + creation_uuid = CharField(null=True) + resource_response = BinaryJSONField(null=True) + id = AutoField() + resource_id = CharField(null=True) + rule_configuration = BinaryJSONField(null=True) + urls = BinaryJSONField(null=True) + + # add a potential parent_job + batch_id = IntegerField(null=True) + batch_processing_block = IntegerField(null=True) + + class Meta: + table_name = JOBTABLE.table + schema = JOBTABLE.schema diff --git a/src/actinia_parallel_plugin/model/response_models.py b/src/actinia_parallel_plugin/model/response_models.py deleted file mode 100644 index 92c9621..0000000 --- a/src/actinia_parallel_plugin/model/response_models.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Copyright (c) 2018-present mundialis GmbH & Co. KG - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . - -Response models -""" - -__license__ = "GPLv3" -__author__ = "Anika Weinmann" -__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" -__maintainer__ = "mundialis GmbH % Co. KG" - - -from flask_restful_swagger_2 import Schema - - -class SimpleStatusCodeResponseModel(Schema): - """Simple response schema to inform about status.""" - - type = "object" - properties = { - "status": { - "type": "number", - "description": "The status code of the request.", - }, - "message": { - "type": "string", - "description": "A short message to describes the status", - }, - } - required = ["status", "message"] - - -simpleResponseExample = SimpleStatusCodeResponseModel( - status=200, message="success" -) -SimpleStatusCodeResponseModel.example = simpleResponseExample diff --git a/src/actinia_parallel_plugin/resources/config.py b/src/actinia_parallel_plugin/resources/config.py new file mode 100644 index 0000000..dea1dfa --- /dev/null +++ b/src/actinia_parallel_plugin/resources/config.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2018-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Configuration file +""" + +__license__ = "GPLv3" +__author__ = "Carmen Tawalika, Anika Weinmann" +__copyright__ = "Copyright 2018-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +import configparser +import os +# from pathlib import Path + + +# # config can be overwritten by mounting *.ini files into folders inside +# # the config folder. +# DEFAULT_CONFIG_PATH = "config" +# CONFIG_FILES = [str(f) for f in Path( +# DEFAULT_CONFIG_PATH).glob('**/*.ini') if f.is_file()] +# GENERATED_CONFIG = DEFAULT_CONFIG_PATH + '/actinia-parallel-plugin.cfg' + +DEFAULT_CONFIG_PATH = os.getenv('DEFAULT_CONFIG_PATH', "/etc/default/actinia") +GENERATED_CONFIG = os.path.join( + os.path.dirname(DEFAULT_CONFIG_PATH), 'actinia-parallel-plugin.cfg') +if not os.path.isfile(DEFAULT_CONFIG_PATH): + open(DEFAULT_CONFIG_PATH, 'a').close() +CONFIG_FILES = [DEFAULT_CONFIG_PATH] + + +class JOBTABLE: + """Default config for database connection for jobtable + """ + host = 'localhost' + port = '5432' + database = 'gis' + user = 'gis' + pw = 'gis' + schema = 'actinia' + table = 'tab_jobs' + id_field = 'id' + batch_id_field = "batch_id" + resource_id_field = "resource_id" + + +class LOGCONFIG: + """Default config for logging + """ + logfile = 'actinia-gdi.log' + level = 'DEBUG' + type = 'stdout' + + +class Configfile: + + def __init__(self): + """ + This class will overwrite the config classes above when config files + named DEFAULT_CONFIG_PATH/**/*.ini exist. + On first import of the module it is initialized. + """ + + config = configparser.ConfigParser() + config.read(CONFIG_FILES) + + if len(config) <= 1: + # print("Could not find any config file, using default values.") + return + + with open(GENERATED_CONFIG, 'w') as configfile: + config.write(configfile) + + # JOBTABLE + if config.has_section("JOBTABLE"): + if config.has_option("JOBTABLE", "host"): + JOBTABLE.host = config.get("JOBTABLE", "host") + if config.has_option("JOBTABLE", "port"): + JOBTABLE.port = config.get("JOBTABLE", "port") + if config.has_option("JOBTABLE", "database"): + JOBTABLE.database = config.get("JOBTABLE", "database") + if config.has_option("JOBTABLE", "user"): + JOBTABLE.user = config.get("JOBTABLE", "user") + if config.has_option("JOBTABLE", "pw"): + JOBTABLE.pw = config.get("JOBTABLE", "pw") + if config.has_option("JOBTABLE", "schema"): + JOBTABLE.schema = config.get("JOBTABLE", "schema") + if config.has_option("JOBTABLE", "table"): + JOBTABLE.table = config.get("JOBTABLE", "table") + if config.has_option("JOBTABLE", "id_field"): + JOBTABLE.id_field = config.get("JOBTABLE", "id_field") + + # overwrite values if ENV values exist: + if os.environ.get('JOBTABLE_USER'): + JOBTABLE.user = os.environ['JOBTABLE_USER'] + if os.environ.get('JOBTABLE_PW'): + JOBTABLE.pw = os.environ['JOBTABLE_PW'] + + # LOGGING + if config.has_section("LOGCONFIG"): + if config.has_option("LOGCONFIG", "logfile"): + LOGCONFIG.logfile = config.get("LOGCONFIG", "logfile") + if config.has_option("LOGCONFIG", "level"): + LOGCONFIG.level = config.get("LOGCONFIG", "level") + if config.has_option("LOGCONFIG", "type"): + LOGCONFIG.type = config.get("LOGCONFIG", "type") + + +init = Configfile() diff --git a/src/actinia_parallel_plugin/resources/logging.py b/src/actinia_parallel_plugin/resources/logging.py new file mode 100644 index 0000000..45e0a4f --- /dev/null +++ b/src/actinia_parallel_plugin/resources/logging.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2018-2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +The logger +""" + +__license__ = "GPLv3" +__author__ = "Carmen Tawalika, Anika Weinmann" +__copyright__ = "Copyright 2018-2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +import logging +from datetime import datetime +from logging import FileHandler + +from colorlog import ColoredFormatter +from pythonjsonlogger import jsonlogger + +from actinia_parallel_plugin.resources.config import LOGCONFIG + + +# Notice: do not call logging.warning (will create new logger for ever) +# logging.warning("called actinia_gdi logger after 1") + +log = logging.getLogger('actinia-parallel-plugin') +werkzeugLog = logging.getLogger('werkzeug') +gunicornLog = logging.getLogger('gunicorn') + + +def setLogFormat(veto=None): + logformat = "" + if LOGCONFIG.type == 'json' and not veto: + logformat = CustomJsonFormatter( + '%(time) %(level) %(component) %(module)' + '%(message) %(pathname) %(lineno)' + '%(processName) %(threadName)' + ) + else: + logformat = ColoredFormatter( + '%(log_color)s[%(asctime)s] %(levelname)-10s: %(name)s.%(module)-' + '10s -%(message)s [in %(pathname)s:%(lineno)d]%(reset)s' + ) + return logformat + + +def setLogHandler(logger, type, format): + if type == 'stdout': + handler = logging.StreamHandler() + elif type == 'file': + # For readability, json is never written to file + handler = FileHandler(LOGCONFIG.logfile) + + handler.setFormatter(format) + logger.addHandler(handler) + + +class CustomJsonFormatter(jsonlogger.JsonFormatter): + def add_fields(self, log_record, record, message_dict): + super(CustomJsonFormatter, self).add_fields( + log_record, record, message_dict) + + # (Pdb) dir(record) + # ... 'args', 'created', 'exc_info', 'exc_text', 'filename', 'funcName' + # ,'getMessage', 'levelname', 'levelno', 'lineno', 'message', 'module', + # 'msecs', 'msg', 'name', 'pathname', 'process', 'processName', + # 'relativeCreated', 'stack_info', 'thread', 'threadName'] + + now = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') + log_record['time'] = now + log_record['level'] = record.levelname + log_record['component'] = record.name + + +def createLogger(): + # create logger, set level and define format + log.setLevel(getattr(logging, LOGCONFIG.level)) + fileformat = setLogFormat('veto') + stdoutformat = setLogFormat() + setLogHandler(log, 'file', fileformat) + setLogHandler(log, 'stdout', stdoutformat) + + +def createWerkzeugLogger(): + werkzeugLog.setLevel(getattr(logging, LOGCONFIG.level)) + fileformat = setLogFormat('veto') + stdoutformat = setLogFormat() + setLogHandler(werkzeugLog, 'file', fileformat) + setLogHandler(werkzeugLog, 'stdout', stdoutformat) + + +def createGunicornLogger(): + gunicornLog.setLevel(getattr(logging, LOGCONFIG.level)) + fileformat = setLogFormat('veto') + stdoutformat = setLogFormat() + setLogHandler(gunicornLog, 'file', fileformat) + setLogHandler(gunicornLog, 'stdout', stdoutformat) + # gunicorn already has a lot of children logger, e.g gunicorn.http, + # gunicorn.access. These lines deactivate their default handlers. + for name in logging.root.manager.loggerDict: + if "gunicorn." in name: + logging.getLogger(name).propagate = True + logging.getLogger(name).handlers = [] + + +createLogger() +createWerkzeugLogger() +createGunicornLogger() diff --git a/src/actinia_parallel_plugin/resources/migrations/0000.info.py b/src/actinia_parallel_plugin/resources/migrations/0000.info.py new file mode 100644 index 0000000..a5395ca --- /dev/null +++ b/src/actinia_parallel_plugin/resources/migrations/0000.info.py @@ -0,0 +1,42 @@ +''' +The initial table is created with peewee (initInstanceDB). +All further adjustments should be made with yoyo-migrations. +For a new adjustment, a new file is needed, stored next to this one. +Yoyo stores already applied migrations in the db: + +gis=# select * from _yoyo_migration ; +migration_hash | migration_id | + applied_at_utc +------------------------------------------------------------------+----------------------+---------------------------- +b9faf3aa8fe158938471e8275bf6f7dc6d49bd4c5e7a89953de4b790b711eba8 | + 0001.add-status_info | 2020-06-10 15:13:45.250212 + +gis=# select * from _yoyo_log ; +id | + migration_hash | migration_id | operation | + username | hostname | comment | created_at_utc +--------------------------------------+------------------------------------------------------------------+----------------------+-----------+----------+----------+---------+--------------------------- +fa2e5dd8-ab2c-11ea-9e24-6057186705c0 | + b9faf3aa8fe158938471e8275bf6f7dc6d49bd4c5e7a89953de4b790b711eba8 | + 0001.add-status_info | apply | default | carmen | | 2020-06-10 + 15:13:45.24777 +(1 row) + +Rollbacks are also possible but currently not integrated in our code. We should +use them, if we have more comlex migration scripts which should not be applied +only partwise. +Pure SQL scripts are also possible but cannot replace variables as needed here. + +For more information, see https://ollycope.com/software/yoyo/latest/ + +''' + +from yoyo import step +from actinia_parallel_plugin.resources.config import JOBTABLE + +# dummy migration to test functionality +steps = [ + step( + "select * from %s limit 1" % JOBTABLE.table + ) +] diff --git a/src/actinia_parallel_plugin/wsgi.py b/src/actinia_parallel_plugin/wsgi.py deleted file mode 100644 index 1f60492..0000000 --- a/src/actinia_parallel_plugin/wsgi.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Copyright (c) 2018-present mundialis GmbH & Co. KG - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . -""" - -__license__ = "GPLv3" -__author__ = "Carmen Tawalika, Anika Weinmann" -__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" -__maintainer__ = "mundialis GmbH % Co. KG" - -from actinia_parallel_plugin.main import app as application diff --git a/test_postbodies/parallel_ephemeral_processing.json b/test_postbodies/parallel_ephemeral_processing.json new file mode 100644 index 0000000..f07d2c8 --- /dev/null +++ b/test_postbodies/parallel_ephemeral_processing.json @@ -0,0 +1,90 @@ +{ + "jobs": [ + { + "list": [ + { + "module": "g.region", + "id": "g_region_nonparallel_block1", + "inputs":[ + {"param": "raster", "value": "elevation@PERMANENT"} + ] + }, + { + "module": "r.mapcalc", + "id": "r_mapcalc_0_nonparallel_block1", + "inputs":[ + {"param": "expression", "value": "baum = elevation@PERMANENT * 2"} + ] + } + ], + "parallel": "false", + "version": "1" + }, + { + "list": [ + { + "module": "g.region", + "id": "g_region_1_parallel_block2", + "inputs":[ + {"param": "raster", "value": "elevation@PERMANENT"} + ], + "flags": "p" + }, + { + "module": "r.info", + "id": "r_info_1_parallel_block2", + "inputs":[ + {"param": "map", "value": "elevation@PERMANENT"} + ] + } + ], + "parallel": "true", + "version": "1" + }, + { + "list": [ + { + "module": "g.region", + "id": "g_region_2_parallel_block2", + "inputs":[ + { + "import_descr": + { + "source": "https://apps.mundialis.de/actinia_test_datasets/elev_ned_30m.tif", + "type": "raster" + }, + "param": "raster", + "value": "elev_ned_30m" + } + ], + "flags": "p" + }, + { + "module": "r.univar", + "id": "r_univar_2_parallel_block2", + "inputs":[ + {"param": "map", "value": "elev_ned_30m"} + ], + "stdout": {"id": "stats", "format": "kv", "delimiter": "="}, + "flags": "g" + } + ], + "parallel": "true", + "version": "1" + }, + { + "list": [ + { + "module": "g.region", + "id": "g_region_nonparallel_block3", + "inputs":[ + {"param": "raster", "value": "elevation@PERMANENT"} + ], + "flags": "p" + } + ], + "parallel": "false", + "version": "1" + } + ] +} diff --git a/test_postbodies/parallel_processing.json b/test_postbodies/parallel_processing.json new file mode 100644 index 0000000..92383e4 --- /dev/null +++ b/test_postbodies/parallel_processing.json @@ -0,0 +1,93 @@ +{ + "jobs": [ + { + "list": [ + { + "module": "g.region", + "id": "g_region_nonparallel_block1", + "inputs":[ + {"param": "raster", "value": "elevation@PERMANENT"} + ] + }, + { + "module": "r.mapcalc", + "id": "r_mapcalc_0_nonparallel_block1", + "inputs":[ + {"param": "expression", "value": "baum = elevation@PERMANENT * 2"} + ] + } + ], + "parallel": "false", + "version": "1" + }, + { + "list": [ + { + "module": "g.region", + "id": "g_region_1_parallel_block2", + "inputs":[ + {"param": "n", "value": "228500"}, + {"param": "s", "value": "215000"}, + {"param": "e", "value": "645000"}, + {"param": "w", "value": "637500"} + ] + }, + { + "module": "r.mapcalc", + "id": "r_mapcalc_1_parallel_block2", + "inputs":[ + {"param": "expression", "value": "baum2 = baum@test * 2"} + ] + } + ], + "parallel": "true", + "version": "1" + }, + { + "list": [ + { + "module": "g.region", + "id": "g_region_2_parallel_block2", + "inputs":[ + {"param": "n", "value": "228500"}, + {"param": "s", "value": "215000"}, + {"param": "e", "value": "637500"}, + {"param": "w", "value": "630000"} + ] + }, + { + "module": "r.mapcalc", + "id": "r_mapcalc_2_parallel_block2", + "inputs":[ + {"param": "expression", "value": "baum2 = baum@test * 2"} + ] + } + ], + "parallel": "true", + "version": "1" + }, + { + "list": [ + { + "module": "g.region", + "id": "g_region_nonparallel_block3", + "inputs":[ + {"param": "raster", "value": "elevation@PERMANENT"} + ] + }, + { + "module": "r.patch", + "id": "r_patch_block3", + "inputs":[ + {"param": "input", "value": "baum2@test_parallel_1,baum2@test_parallel_2"} + ], + "outputs":[ + {"param": "input", "value": "baum2"} + ] + } + ], + "parallel": "false", + "version": "1" + } + ] +} diff --git a/test_postbodies/parallel_processing_soll.json b/test_postbodies/parallel_processing_soll.json new file mode 100644 index 0000000..9722de4 --- /dev/null +++ b/test_postbodies/parallel_processing_soll.json @@ -0,0 +1,138 @@ +{ + "processing_host": "http://actinia-core-docker:8088/", + "processing_platform_name": "example_name", + "jobs": [ + { + "list": [ + { + "module": "stac_importer", + "inputs":[ + {"param": "param1", "value": "value1"} + ] + } + ], + "parallel": "false", + "version": "1" + }, + { + "list": [ + { + "module": "actinia_tiling", + "comment": "All jobs executed in parallel loop for each tile", + "inputs":[ + // With this approach, also area size would be possible + // {"param": "size", "value": "10000"}, + {"param": "num_tiles", "value": "10000"} + ], + "outputs":[ + {"param": "raster", "value": "ndvi,ndwi"} + ], + "jobs": [ + { + "list": [ + { + "module": "g.region", + "inputs":[ + {"param": "x", "value": "{{ tile_id }}"} + ] + }, + { + "module": "r.mask", + "inputs":[ + {"param": "x", "value": "y"} + ] + } + ], + "parallel": "false" + }, + { + "list": [ + { + "module": "r.mapcalc", + "inputs":[ + {"param": "x", "value": "ndvi"} + ] + } + ], + "parallel": "true" + }, + { + "list": [ + { + "module": "r.mapcalc", + "inputs":[ + {"param": "x", "value": "ndwi"} + ] + } + ], + "parallel": "true" + } + ] + } + ], + "parallel": "false", + "version": "1" + }, + { + "list": [ + { + "module": "actinia_tiling", + "comment": "All jobs executed in parallel loop for each tile", + "inputs":[ + // With this approach, also area size would be possible + // {"param": "size", "value": "10000"}, + {"param": "num_tiles", "value": "10000"}, + // TODO: parameter or flag? + {"param": "reuse_tiles", "value": "true"} + ], + "outputs":[ + {"param": "raster", "value": "agg1,agg2"} + ], + "jobs": [ + { + "list": [ + { + "module": "g.region", + "inputs":[ + {"param": "x", "value": "{{ tile_id }}"} + ] + }, + { + "module": "r.mask", + "inputs":[ + {"param": "x", "value": "y"} + ] + } + ], + "parallel": "false" + }, + { + "list": [ + { + "module": "t.aggregate", + "inputs":[ + {"param": "x", "value": "red_nir,green_red"} + ] + } + ], + "parallel": "true" + }, + { + "list": [ + { + "module": "t.aggregate", + "inputs":[ + {"param": "x", "value": "blue,blue_nir"} + ] + } + ], + "parallel": "true" + } + ] + } + ], + "parallel": "false", + "version": "1" + } + ] +} diff --git a/tests/integrationtests/test_00_connections.py b/tests/integrationtests/test_00_connections.py new file mode 100644 index 0000000..7ce4ce0 --- /dev/null +++ b/tests/integrationtests/test_00_connections.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Parallel ephemeral processing tests +""" + +__license__ = "GPLv3" +__author__ = "Anika Weinmann" +__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +import os +import pytest +import redis +import psycopg2 + +from actinia_core.core.common.config import global_config as config + +from actinia_parallel_plugin.resources.config import JOBTABLE + +from ..test_resource_base import ActiniaResourceTestCaseBase + + +class ConnectionTest(ActiniaResourceTestCaseBase): + + @pytest.mark.integrationtest + def test_redis_connection(self): + """Test redis connection + """ + if "ACTINIA_CUSTOM_TEST_CFG" in os.environ: + config.read(os.environ["ACTINIA_CUSTOM_TEST_CFG"]) + kwargs = dict() + kwargs["host"] = config.REDIS_SERVER_URL + kwargs["port"] = config.REDIS_SERVER_PORT + if config.REDIS_SERVER_PW and config.REDIS_SERVER_PW is not None: + kwargs["password"] = config.REDIS_SERVER_PW + connection_pool = redis.ConnectionPool(**kwargs) + redis_server = redis.StrictRedis(connection_pool=connection_pool) + try: + redis_server.ping() + assert True + except redis.exceptions.ResponseError: + assert False, "Could not connect to redis ({kwargs['host']})" + except redis.exceptions.AuthenticationError: + assert False, "Invalid redis password" + except redis.exceptions.ConnectionError as e: + assert False, f"Redis connection error: {e}" + connection_pool.disconnect() + + @pytest.mark.integrationtest + def test_postgis_connection(self): + """Test postgis connection + """ + try: + conn = psycopg2.connect(**{ + 'host': JOBTABLE.host, + 'port': JOBTABLE.port, + 'user': JOBTABLE.user, + 'password': JOBTABLE.pw + }) + assert True + conn.close() + except Exception: + assert False, "Postgis connection failed!" diff --git a/tests/integrationtests/test_helloworld.py b/tests/integrationtests/test_helloworld.py deleted file mode 100644 index 474c85c..0000000 --- a/tests/integrationtests/test_helloworld.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Copyright (c) 2018-present mundialis GmbH & Co. KG - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . - -Hello World test -""" - -__license__ = "GPLv3" -__author__ = "Anika Weinmann" -__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" -__maintainer__ = "mundialis GmbH % Co. KG" - - -import json -import pytest -from flask import Response - -from ..test_resource_base import ActiniaResourceTestCaseBase, URL_PREFIX - - -class ActiniaParallelProcessingTest(ActiniaResourceTestCaseBase): - @pytest.mark.integrationtest - def test_get_processing_parallel(self): - """Test the get method of the /processing_parallel endpoint""" - resp = self.server.get(URL_PREFIX + "/processing_parallel") - - assert type(resp) is Response, "The response is not of type Response" - assert resp.status_code == 200, "The status code is not 200" - assert hasattr(resp, "json"), "The response has no attribute 'json'" - assert "message" in resp.json, ( - "There is no 'message' inside the " "response" - ) - assert resp.json["message"] == "Hello world!", ( - "The response message" " is wrong" - ) - - @pytest.mark.integrationtest - def test_post_processing_parallel(self): - """Test the post method of the /processing_parallel endpoint""" - postbody = {"name": "test"} - resp = self.server.post( - URL_PREFIX + "/processing_parallel", - headers=self.user_auth_header, - data=json.dumps(postbody), - content_type="application/json", - ) - assert type(resp) is Response, "The response is not of type Response" - assert resp.status_code == 200, "The status code is not 200" - assert hasattr(resp, "json"), "The response has no attribute 'json'" - assert "message" in resp.json, ( - "There is no 'message' inside the " "response" - ) - assert resp.json["message"] == "Hello world TEST!", ( - "The response " "message is wrong" - ) - - @pytest.mark.integrationtest - def test_post_processing_parallel_error(self): - """Test the post method of the /processing_parallel endpoint""" - postbody = {"namee": "test"} - resp = self.server.post( - URL_PREFIX + "/processing_parallel", - headers=self.user_auth_header, - data=json.dumps(postbody), - content_type="application/json", - ) - assert type(resp) is Response, "The response is not of type Response" - assert resp.status_code == 400, "The status code is not 400" - assert resp.data == b"Missing name in JSON content" diff --git a/tests/integrationtests/test_parallel_ephemeral_processing.py b/tests/integrationtests/test_parallel_ephemeral_processing.py new file mode 100644 index 0000000..4f1e170 --- /dev/null +++ b/tests/integrationtests/test_parallel_ephemeral_processing.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Parallel ephemeral processing tests +""" + +__license__ = "GPLv3" +__author__ = "Anika Weinmann" +__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + + +import pytest +from flask.json import loads as json_loads + +from ..test_resource_base import ActiniaResourceTestCaseBase, URL_PREFIX + +PC = """{ + "jobs": [ + { + "list": [ + { + "module": "g.region", + "id": "g_region_nonparallel_block1", + "inputs":[ + {"param": "raster", "value": "elevation@PERMANENT"} + ] + }, + { + "module": "r.mapcalc", + "id": "r_mapcalc_0_nonparallel_block1", + "inputs":[ + {"param": "expression", "value": "baum = elevation@PERMANENT * 2"} + ] + } + ], + "parallel": "false", + "version": "1" + }, + { + "list": [ + { + "module": "g.region", + "id": "g_region_1_parallel_block2", + "inputs":[ + {"param": "raster", "value": "elevation@PERMANENT"} + ], + "flags": "p" + }, + { + "module": "r.info", + "id": "r_info_1_parallel_block2", + "inputs":[ + {"param": "map", "value": "elevation@PERMANENT"} + ] + } + ], + "parallel": "true", + "version": "1" + }, + { + "list": [ + { + "module": "g.region", + "id": "g_region_1_parallel_block2", + "inputs":[ + {"param": "raster", "value": "elevation@PERMANENT"} + ], + "flags": "p" + }, + { + "module": "r.univar", + "id": "r_univar_2_parallel_block2", + "inputs":[ + {"param": "map", "value": "elevation@PERMANENT"} + ], + "stdout": {"id": "stats", "format": "kv", "delimiter": "="}, + "flags": "g" + } + ], + "parallel": "true", + "version": "1" + }, + { + "list": [ + { + "module": "g.region", + "id": "g_region_nonparallel_block3", + "inputs":[ + {"param": "raster", "value": "elevation@PERMANENT"} + ], + "flags": "p" + } + ], + "parallel": "false", + "version": "1" + } + ] +} +""" + + +class ActiniaParallelProcessingTest(ActiniaResourceTestCaseBase): + + location = "nc_spm_08" + base_url = f"{URL_PREFIX}/locations/{location}" + content_type = "application/json" + + @pytest.mark.integrationtest + def test_post_parallel_ephemeral_processing(self): + """Test the post method of the parallel ephemeral processing endpoint + """ + url = f"{self.base_url}/processing_parallel" + + rv = self.server.post( + url, + headers=self.user_auth_header, + content_type=self.content_type, + data=PC, + ) + resp = self.waitAsyncBatchJob( + rv, + headers=self.user_auth_header, + http_status=200, + status="SUCCESS", + ) + assert "resource_response" in resp, \ + "No 'resource_response' in response" + assert len(resp["resource_response"]) == 4, \ + "There are not 4 actinia core responses" + process_results = [ + ac_resp["process_results"] for key, ac_resp in + resp["resource_response"].items() if + ac_resp["process_results"] != {}] + assert "stats" in process_results[0] + # Test request of one job of the batch + batch_id = resp["batch_id"] + job_id = resp["id"][0] + url = f"{URL_PREFIX}/resources/{self.user_id}/batches/{batch_id}/" \ + f"jobs/{job_id}" + rv2 = self.server.get(url, headers=self.user_auth_header) + resp2 = json_loads(rv2.data) + assert resp2["batch_id"] == batch_id, "wrong batch ID in job response" + assert resp2["id"] == int(job_id), "wrong job ID in job response" + assert "resource_response" in resp2, \ + "resource_response not in job response" + assert "urls" in resp2["resource_response"], "urls not in job response" + assert "status" in resp2["resource_response"]["urls"], \ + "status url not in job response" + assert "resource_id-" in resp2["resource_response"]["urls"][ + "status"], "resource_id not in job response" diff --git a/tests/test_resource_base.py b/tests/test_resource_base.py index 0e771b0..bb70756 100644 --- a/tests/test_resource_base.py +++ b/tests/test_resource_base.py @@ -25,6 +25,8 @@ import signal import time +from flask.json import loads as json_loads +from urllib.parse import urlsplit from werkzeug.datastructures import Headers from actinia_core.testsuite import ActiniaTestCaseBase, URL_PREFIX @@ -135,3 +137,31 @@ def create_user(cls, name="guest", role="guest", cls.users_list.append(user) return name, group, cls.auth_header[role] + + def waitAsyncBatchJob(self, rv, headers, http_status=200, + status="SUCCESS", message_check=None): + resp_data = json_loads(rv.data) + url = urlsplit(resp_data["urls"]["status"]).path + + while True: + rv = self.server.get(url, headers=headers) + resp_data = json_loads(rv.data) + resp_data = json_loads(rv.data) + if (resp_data["status"] == "SUCCESS" + or resp_data["status"] == "ERROR" + or resp_data["status"] == "TERMINATED"): + break + + time.sleep(0.2) + self.assertEqual( + resp_data["status"], status, + msg=f"Process has not status '{status}': {resp_data}") + self.assertEqual(rv.status_code, http_status, + "HTML status code is wrong %i" % rv.status_code) + + if message_check is not None: + self.assertTrue(message_check in resp_data["message"], + (f"Message is {resp_data['message']}")) + + time.sleep(0.4) + return resp_data diff --git a/tests/unittests/test_batches.py b/tests/unittests/test_batches.py new file mode 100644 index 0000000..9f29df9 --- /dev/null +++ b/tests/unittests/test_batches.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Copyright (c) 2022 mundialis GmbH & Co. KG + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +Unit tests for core functionallity of batches +""" + +__license__ = "GPLv3" +__author__ = "Anika Weinmann" +__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH % Co. KG" + +import pytest +import datetime +from actinia_parallel_plugin.core.batches import checkProcessingBlockFinished + + +baseurl = "http://localhost:8088/api/v3" +resource_id1 = "resource_id-a6da5e00-d2a3-4804-b82f-e03f92ab1cd4" +resource_id2 = "resource_id-291e8428-ec86-40a6-9ed2-ef1e14357aff" +jobs = [ + { + "id": 7, + "time_created": datetime.datetime(2022, 6, 2, 7, 20, 14, 930771), + "time_started": None, + "time_estimated": None, + "time_ended": None, + "status": "PREPARING", + "resource_response": None, + "resource_id": None, + "creation_uuid": "81eae975-62c1-46f1-97d3-e027834a11b8", + "message": None, + "batch_id": 2, + "batch_processing_block": 2 + }, + { + "id": 8, + "time_created": datetime.datetime(2022, 6, 2, 7, 20, 14, 940472), + "time_started": None, + "time_estimated": None, + "time_ended": None, + "status": "PREPARING", + "resource_response": None, + "resource_id": None, + "creation_uuid": "a4be1541-70cc-42ac-b134-c17f0ea8d311", + "message": None, + "batch_id": 2, + "batch_processing_block": 3 + }, + { + "id": 5, + "time_created": datetime.datetime(2022, 6, 2, 7, 20, 14, 906827), + "time_started": None, + "time_estimated": None, + "time_ended": datetime.datetime(2022, 6, 2, 7, 20, 15), + "status": "SUCCESS", + "resource_response": { + "urls": { + "status": f"{baseurl}/resources/actinia-gdi/{resource_id2}", + "resources": [], + }, + "status": "finished", + "message": "Processing successfully finished", + "user_id": "actinia-gdi", + "api_info": { + "path": "/api/v3/locations/nc_spm_08_grass7_root/" + "processing_parallel", + "method": "POST", + "endpoint": "asyncparallelephermeralresource", + "post_url": f"{baseurl}/locations/nc_spm_08_grass7_root/" + "processing_parallel", + "request_url": f"{baseurl}/locations/nc_spm_08_grass7_root/" + "processing_parallel", + }, + "datetime": "2022-06-02 07:20:15.942998", + "progress": {"step": 2, "num_of_steps": 2}, + "http_code": 200, + "timestamp": 1654154415.9429944, + "time_delta": 0.9949047565460205, + "process_log": [{"..."}], + "resource_id": resource_id2, + "accept_datetime": "2022-06-02 07:20:14.948113", + "process_results": {}, + "accept_timestamp": 1654154414.9481106, + "process_chain_list": [ + { + "list": [{"..."}], + "version": "1", + } + ], + }, + "resource_id": resource_id2, + "creation_uuid": "767596e2-a9e4-4e96-b05b-4d77ae304a54", + "message": None, + "batch_id": 2, + "batch_processing_block": 1 + }, + { + "id": 6, + "time_created": datetime.datetime(2022, 6, 2, 7, 20, 14, 920875), + "time_started": None, + "time_estimated": None, + "time_ended": datetime.datetime(2022, 6, 2, 7, 20, 49), + "status": "SUCCESS", + "resource_response": { + "urls": { + "status": f"{baseurl}/resources/actinia-gdi/{resource_id1}", + "resources": [], + }, + "status": "finished", + "message": "Processing successfully finished", + "user_id": "actinia-gdi", + "api_info": { + "path": "/api/v3/locations/nc_spm_08_grass7_root/" + "processing_parallel", + "method": "POST", + "endpoint": "asyncparallelephermeralresource", + "post_url": f"{baseurl}/locations/nc_spm_08_grass7_root/" + "processing_parallel", + "request_url": f"{baseurl}/locations/nc_spm_08_grass7_root/" + "processing_parallel", + }, + "datetime": "2022-06-02 07:20:49.262223", + "progress": {"step": 2, "num_of_steps": 2}, + "http_code": 200, + "timestamp": 1654154449.2622168, + "time_delta": 0.4729771614074707, + "process_log": [{"..."}], + "resource_id": resource_id1, + "accept_datetime": "2022-06-02 07:20:48.789271", + "process_results": {}, + "accept_timestamp": 1654154448.7892694, + "process_chain_list": [ + { + "list": [{"..."}], + "version": "1", + } + ], + }, + "resource_id": resource_id1, + "creation_uuid": "d08c1bbb-72f4-482f-bc78-672756937efa", + "message": None, + "batch_id": 2, + "batch_processing_block": 2 + }, +] +block = [1, 2, 3] +ref_out = [True, False, False] + + +@pytest.mark.unittest +@pytest.mark.parametrize( + "block,ref_out", + [(block[0], ref_out[0]), (block[1], ref_out[1]), (block[2], ref_out[2])], +) +def test_checkProcessingBlockFinished(block, ref_out): + """Test for checkProcessingBlockFinished function.""" + + out = checkProcessingBlockFinished(jobs, block) + assert ( + out is ref_out + ), f"Wrong result from transform_input for block {block}" diff --git a/tests/unittests/test_transformation.py b/tests/unittests/test_transformation.py deleted file mode 100644 index 0b742b0..0000000 --- a/tests/unittests/test_transformation.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Copyright (c) 2018-present mundialis GmbH & Co. KG - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . - -First test -""" - -__license__ = "GPLv3" -__author__ = "Anika Weinmann" -__copyright__ = "Copyright 2022 mundialis GmbH & Co. KG" -__maintainer__ = "mundialis GmbH % Co. KG" - -import pytest -from actinia_parallel_plugin.core.example import transform_input - - -@pytest.mark.unittest -@pytest.mark.parametrize( - "inp,ref_out", - [("test", "Hello world TEST!"), ("bla23", "Hello world BLA23!")], -) -def test_transform_input(inp, ref_out): - """Test for tranform_input function.""" - out = transform_input(inp) - assert out == ref_out, f"Wrong result from transform_input for {inp}" diff --git a/tests_with_redis.sh b/tests_with_redis.sh deleted file mode 100644 index 57405bf..0000000 --- a/tests_with_redis.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env sh - -# start redis server -redis-server & -sleep 1 -redis-cli ping - -# start webhook server -webhook-server --host "0.0.0.0" --port "5005" & -sleep 10 - -# run tests -echo $ACTINIA_CUSTOM_TEST_CFG -echo $DEFAULT_CONFIG_PATH - -if [ "$1" == "dev" ] -then - echo "Executing only 'dev' tests ..." - python3 setup.py test --addopts "-m dev" -elif [ "$1" == "integrationtest" ] -then - python3 setup.py test --addopts "-m 'integrationtest'" -else - python3 setup.py test -fi - -TEST_RES=$? - -# stop redis server -redis-cli shutdown - -return $TEST_RES