diff --git a/ion/processes/bootstrap/ion_loader.py b/ion/processes/bootstrap/ion_loader.py index 0fc9fc0b0..b11ef4581 100644 --- a/ion/processes/bootstrap/ion_loader.py +++ b/ion/processes/bootstrap/ion_loader.py @@ -115,7 +115,7 @@ ### the URL below should point to a COPY of the master google spreadsheet that works with this version of the loader #Apr15 TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0ArFEMmslwP1ddHY3Zmlza0h5LXZINmpXRXNvRXBkdEE&output=xls" #Apr21 TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0AgjFgozf2vG6dHRFS0x4eWdRM21vMHdEMWZTeFFNTVE&output=xls" -TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0AgjFgozf2vG6dGZ6TXdQZ2VTT0phdXMyU0JydmE2cHc&output=xls" +TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0AsF-1tKj9wC_dFpoTzZWeXBMeGNhV1lhZjVlN3lTNWc&output=xls" ### while working on changes to the google doc, use this to run test_loader.py against the master spreadsheet #TESTED_DOC=MASTER_DOC @@ -154,6 +154,7 @@ 'PlatformAgentInstance', 'InstrumentAgent', 'InstrumentDevice', + 'DataSource', 'ExternalDataProvider', 'ExternalDatasetModel', 'ExternalDataset', @@ -3047,6 +3048,9 @@ def _load_ExternalDatasetModel(self, row): 'data_acquisition_management', 'create_external_dataset_model', support_bulk=True) + def _load_DataSource(self, row): + self._basic_resource_create(row, 'DataSource', 'ds/','data_acquisition_management','create_data_source',support_bulk=True) + def _load_ExternalDataset(self, row): contacts = self._get_contacts(row, field='contact_id') if len(contacts) > 1: diff --git a/ion/processes/bootstrap/plugins/bootstrap_eoi.py b/ion/processes/bootstrap/plugins/bootstrap_eoi.py index 736bdfd4f..65e1c17a8 100644 --- a/ion/processes/bootstrap/plugins/bootstrap_eoi.py +++ b/ion/processes/bootstrap/plugins/bootstrap_eoi.py @@ -18,6 +18,7 @@ def on_initial_bootstrap(self, process, config, **kwargs): EOI BootstrapPlugin Resets the geoserver datastore... + Performs inital parsing of the eoi datasources """ using_eoi_services = config.get_safe('eoi.meta.use_eoi_services', False) if using_eoi_services: diff --git a/ion/processes/bootstrap/plugins/bootstrap_process_dispatcher.py b/ion/processes/bootstrap/plugins/bootstrap_process_dispatcher.py index 09ff95470..c3ad2267f 100644 --- a/ion/processes/bootstrap/plugins/bootstrap_process_dispatcher.py +++ b/ion/processes/bootstrap/plugins/bootstrap_process_dispatcher.py @@ -23,6 +23,20 @@ def on_initial_bootstrap(self, process, config, **kwargs): self.notification_worker(process,config) self.registration_worker(process,config) self.pydap_server(process,config) + self.eoi_services(process,config) + + def eoi_services(self,process,config): + eoi_module = config.get_safe('bootstrap.processes.registration.module', 'ion.processes.data.registration.eoi_registration_process') + eoi_class = config.get_safe('bootstrap.processes.registration.class', 'EOIRegistrationProcess') + + process_definition = ProcessDefinition( + name = 'eoi_server', + description = 'Process for eoi data sources') + process_definition.executable['module'] = eoi_module + process_definition.executable['class'] = eoi_class + + self._create_and_launch(process_definition) + def pydap_server(self, process, config): pydap_module = config.get_safe('bootstrap.processes.pydap.module', 'ion.processes.data.externalization.lightweight_pydap') diff --git a/ion/processes/data/registration/eoi_registration_process.py b/ion/processes/data/registration/eoi_registration_process.py new file mode 100644 index 000000000..586322eb8 --- /dev/null +++ b/ion/processes/data/registration/eoi_registration_process.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +from pyon.ion.process import SimpleProcess +from pyon.ion.event import EventSubscriber +from pyon.public import OT, RT +import requests +from pyon.public import CFG +from pyon.util.log import log +from pyon.util.breakpoint import breakpoint + +# GeoNetwork +# TODO should maybe go in the yml file +REQUEST_HARVESTER = "requestharvester" +CREATE_HARVESTER = "createharvester" +UPDATE_HARVESTER = "updateharvester" +REMOVE_HARVESTER = "removeharvester" +RESET_HARVESTER = "resetharvester" +START_HARVESTER = "startharvester" +STOP_HARVESTER = "stopharvester" +RUN_HARVESTER = "runharvester" + +class EOIRegistrationProcess(SimpleProcess): + + def on_start(self): + self.data_source_subscriber = EventSubscriber(event_type=OT.ResourceModifiedEvent, + origin_type=RT.DataSource, + callback=self._register_data_source) + self.provider_subscriber = EventSubscriber(event_type=OT.ResourceModifiedEvent, + origin_type=RT.ExternalDataProvider, + callback=self._register_provider) + self.data_source_subscriber.start() + self.provider_subscriber.start() + + self.rr = self.container.resource_registry + + self.using_eoi_services = CFG.get_safe('eoi.meta.use_eoi_services', False) + self.server = CFG.get_safe('eoi.importer_service.server', "localhost")+":"+str(CFG.get_safe('eoi.importer_service.port', 8844)) + + log.info("Using geoservices="+str(self.using_eoi_services)) + if not self.using_eoi_services: + log.warn("not using geoservices...") + + self.importer_service_available = self.check_for_importer_service() + if not self.importer_service_available: + log.warn("not using importer service...") + + def check_for_importer_service(self): + ''' + only gets run on start, used to identify if importer service is available + ''' + try: + r = requests.get(self.server+'/service=alive&name=ooi&id=ooi') + log.info("importer service available, status code: %s", str(r.status_code)) + #alive service returned ok + if r.status_code == 200: + return True + else: + return False + except Exception as e: + #SERVICE IS REALLY NOT AVAILABLE + log.warn("importer service is really not available...%s", e) + return False + + + def _register_data_source(self, event, *args, **kwargs): + ''' + used to create a harvester + ''' + if self.importer_service_available: + obj = self.rr.read(event.origin) + data_fields = [] + for attrname, value in vars(obj).iteritems(): + #generate th param list to pass to importer service using field names + if attrname is not "contact": + f = attrname.replace("_", "")+"="+str(obj[attrname]) + data_fields.append(f) + + param_list = '&'.join(data_fields) + + request_string = self.server+'/service='+CREATE_HARVESTER+"&"+param_list + r = requests.get(request_string) + + + def _register_provider(self, event, *args, **kwargs): + if self.importer_service_available: + #print "provider id:", event.origin + pass + + + def on_quit(self): + self.data_source_subscriber.stop() + self.provider_subscriber.stop() + diff --git a/ion/services/eoi/table_loader.py b/ion/services/eoi/table_loader.py index dc9d00256..d5bb60378 100644 --- a/ion/services/eoi/table_loader.py +++ b/ion/services/eoi/table_loader.py @@ -22,6 +22,7 @@ REAL = "real" INT = "int" TIMEDATE = "timestamp" +REQUIRED_PARAMS = ["lat","lon"] class ResourceParser(object): @@ -171,8 +172,7 @@ def create_single_resource(self, new_resource_id, param_dict): coverage_path = self._get_coverage_path(new_resource_id) #generate table from params and id - [success, prim_types] = self.generate_sql_table(new_resource_id, param_dict, relevant, coverage_path) - + [success, prim_types] = self.generate_sql_table(new_resource_id, param_dict, relevant, coverage_path) if success: #generate geoserver layer self.send_geonode_request(self.addlayer, new_resource_id, prim_types) @@ -214,6 +214,16 @@ def get_value_encoding(self, name, value_encoding): return encoding_string, prim_type + ''' + verifies that the required params are in the resources + ''' + def required_fields_satisfied(self,param_list): + #should always contain atleast 3 params + if (len(param_list)>3): + return set(REQUIRED_PARAMS).issubset(set(param_list)) + else: + return False + def generate_sql_table(self, dataset_id, params, relevant, coverage_path): """ Generates Foreign data table for used with postgres @@ -221,73 +231,80 @@ def generate_sql_table(self, dataset_id, params, relevant, coverage_path): #check table exists if not self.does_table_exist(dataset_id): valid_types = {} - create_table_string = 'create foreign table "%s" (' % dataset_id - - #loop through the params - encodings = [] - for param in relevant: - #get the information - data_item = params[param] - desc = data_item[1]['description'] - ooi_short_name = data_item[1]['ooi_short_name'] - name = data_item[1]['name'] - disp_name = data_item[1]['display_name'] - internal_name = data_item[1]['internal_name'] - cm_type = data_item[1]['param_type']['cm_type'] - units = "" - try: - units = data_item[1]['uom'] - except Exception as e: - if DEBUG: - log.debug("no units available...%s", e.message) + create_table_string = 'create foreign table "%s" (' % dataset_id + log.debug("relevant:"+relevant+" valid?:"+self.required_fields_satisfied(relevant)) + if self.required_fields_satisfied(relevant): + #loop through the params + encodings = [] + + for param in relevant: + #get the information + data_item = params[param] + desc = data_item[1]['description'] + ooi_short_name = data_item[1]['ooi_short_name'] + name = data_item[1]['name'] + disp_name = data_item[1]['display_name'] + internal_name = data_item[1]['internal_name'] + cm_type = data_item[1]['param_type']['cm_type'] + units = "" + try: + units = data_item[1]['uom'] + except Exception as e: + if DEBUG: + log.debug("no units available...%s", e.message) + + value_encoding = data_item[1]['param_type']['_value_encoding'] + fill_value = data_item[1]['param_type']['_fill_value'] + std_name = data_item[1]['standard_name'] + + #only use things that have valid value + if len(name) > 0: #and (len(desc)>0) and (len(units)>0) and (value_encoding is not None)): + if DEBUG: + log.debug("-------processed-------") + log.debug(str(ooi_short_name)) + log.debug(str(desc)) + log.debug(str(name)) + log.debug(str(disp_name)) + log.debug(str(units)) + log.debug(str(internal_name)) + log.debug(str(value_encoding)) + log.debug(str(cm_type[1])) + + if cm_type[1] == "ArrayType": + #ignore array types + pass + else: + [encoding, prim_type] = self.get_value_encoding(name, value_encoding) + if encoding is not None: + encodings.append(encoding) + valid_types[name] = prim_type + + pass + + create_table_string += ','.join(encodings) + log.debug("coverage path:"+coverage_path) + create_table_string = self.add_server_info(create_table_string, coverage_path, dataset_id) - value_encoding = data_item[1]['param_type']['_value_encoding'] - fill_value = data_item[1]['param_type']['_fill_value'] - std_name = data_item[1]['standard_name'] - - #only use things that have valid value - if len(name) > 0: #and (len(desc)>0) and (len(units)>0) and (value_encoding is not None)): - if DEBUG: - log.debug("-------processed-------") - log.debug(str(ooi_short_name)) - log.debug(str(desc)) - log.debug(str(name)) - log.debug(str(disp_name)) - log.debug(str(units)) - log.debug(str(internal_name)) - log.debug(str(value_encoding)) - log.debug(str(cm_type[1])) - - if cm_type[1] == "ArrayType": - #ignore array types - pass - else: - [encoding, prim_type] = self.get_value_encoding(name, value_encoding) - if encoding is not None: - encodings.append(encoding) - valid_types[name] = prim_type - - pass - - create_table_string += ','.join(encodings) - log.debug("coverage path:"+coverage_path) - create_table_string = self.add_server_info(create_table_string, coverage_path, dataset_id) - - if DEBUG: - log.debug('\n%s', create_table_string) - - try: - self.cur.execute(create_table_string) - self.con.commit() - #should always be lat and lon - self.cur.execute(self.generate_table_view(dataset_id, self.latitude, self.longitude)) - self.con.commit() - return self.does_table_exist(dataset_id), valid_types - - except Exception as e: - #error setting up connection - log.debug('Error %s', e) - raise + if DEBUG: + log.debug('\n%s', create_table_string) + + #check that the dataproduct has all the required fields + + try: + self.cur.execute(create_table_string) + self.con.commit() + #should always be lat and lon + self.cur.execute(self.generate_table_view(dataset_id, self.latitude, self.longitude)) + self.con.commit() + return self.does_table_exist(dataset_id), valid_types + + except Exception as e: + #error setting up connection + log.debug('Error %s', e) + raise + else: + log.warn('resource skipped, it does not contain all of the required params:') + return False else: if DEBUG: diff --git a/ion/services/eoi/test/test_eoi_resources.py b/ion/services/eoi/test/test_eoi_resources.py new file mode 100644 index 000000000..17080a071 --- /dev/null +++ b/ion/services/eoi/test/test_eoi_resources.py @@ -0,0 +1,288 @@ +""" +@author Andy Bird +@author Jim Case +@brief Test cases for the eoi data provider resources, +""" +from ion.services.dm.test.dm_test_case import DMTestCase, Streamer +from ion.processes.data.transforms.viz.google_dt import VizTransformGoogleDTAlgorithm +from ion.processes.data.replay.replay_process import RetrieveProcess +from ion.services.dm.utility.test.parameter_helper import ParameterHelper +from ion.services.dm.utility.granule import RecordDictionaryTool +from ion.services.dm.test.test_dm_end_2_end import DatasetMonitor +from ion.services.dm.utility.tmpsf_simulator import TMPSFSimulator +from ion.services.dm.utility.bad_simulator import BadSimulator +from ion.util.direct_coverage_utils import DirectCoverageAccess +from ion.services.dm.utility.hydrophone_simulator import HydrophoneSimulator +from ion.services.dm.inventory.dataset_management_service import DatasetManagementService +from ion.services.dm.utility.provenance import graph +from ion.processes.data.registration.registration_process import RegistrationProcess +from coverage_model import ParameterFunctionType, ParameterDictionary, PythonFunction, ParameterContext as CovParameterContext +from ion.processes.data.transforms.transform_worker import TransformWorker +from interface.objects import DataProcessDefinition, InstrumentDevice, ParameterFunction, ParameterFunctionType as PFT, ParameterContext +from nose.plugins.attrib import attr +from pyon.util.breakpoint import breakpoint +from pyon.core.exception import NotFound +from pyon.event.event import EventSubscriber +from pyon.util.file_sys import FileSystem +from pyon.public import IonObject, RT, CFG, PRED, OT +from pyon.util.containers import DotDict +from pydap.client import open_url +from shutil import rmtree +from datetime import datetime, timedelta +from pyon.net.endpoint import RPCClient +from pyon.util.log import log +from pyon.ion.event import EventPublisher +from interface.objects import InstrumentSite, InstrumentModel, PortTypeEnum, Deployment, CabledInstrumentDeploymentContext +from nose.tools import with_setup +import lxml.etree as etree +import simplejson as json +import pkg_resources +import tempfile +import os +import unittest +import numpy as np +import time +import gevent +import requests +from gevent.event import Event +import calendar +from interface.objects import DataSource, ExternalDataProvider + + +MOCK_HARVESTER_NAME = "test_harvester" + + +@attr('INT', group='eoi') +class TestEOIExternalResources(DMTestCase): + ''' + tests the addition of external resources in to the system through preload + checks that there are datasources in geonetwork + checks that neptune and ioos have been added through preload as resources + ''' + @unittest.skipIf( not (CFG.get_safe('eoi.meta.use_eoi_services', False)), 'Skip test services are not loaded') + def test_external_data_provider_during_preload(self): + self.preload_external_providers() + + self.rr = self.container.resource_registry + + data_list = self.rr.find_resources(restype=RT.DataSource) + data_list = data_list[0] + #more than one? + self.assertTrue(len(data_list)>1) + #make sure that the expected list is all there + expected_list = ['neptune','ioos','ooi'] + for data in data_list: + self.assertTrue(data.name in expected_list) + + #try more than once to get the harvester list, as can take a second to update + for x in xrange(0,3): + h_list = self.get_harvester_list() + names = self.get_harvester_names(h_list) + + #check that the preload task loaded the required harvester + if len(names)>0: + all_accounted_for = set(expected_list).issubset(set(names)) + + if all_accounted_for: + log.debug("All harvesters accounted for...") + else: + log.warn("All harvesters not accounted for") + for expected_name in expected_list: + if expected_name not in names: + log.error("harvester:"+expected_name+" in preload and resources, not in geonetwork") + else: + log.warn("remoing harvester from geonetwork:"+expected_name) + self.remove_harvester_list(expected_name) + + else: + log.error("no harvester names returned, check geonetwork connection") + + self.remove_added_harvesters() + + + ''' + tests the addition of external resources in to the system, + skipped as not really needed, but might be useful down the road + ''' + @unittest.skip + def test_add_datasource_externaldataprovider_to_rr(self): + self.preload_external_providers() + + ds = DataSource(name='bob') + cc.resource_registry.create(ds) + + edp = ExternalDataProvider(name='bob') + cc.resource_registry.create(edp) + + self.remove_added_harvesters() + + + ''' + preload data from select scenario + ''' + def preload_external_providers(self): + config = DotDict() + config.op = 'load' + config.loadui=True + config.ui_path = "http://userexperience.oceanobservatories.org/database-exports/Candidates" + config.attachments = "res/preload/r2_ioc/attachments" + config.scenario = 'AB_TEST' + config.path = 'master' + self.container.spawn_process('preloader', 'ion.processes.bootstrap.ion_loader', 'IONLoader', config) + + ''' + can get the list of harvesters from th importer service, hits the geonetwork service + ''' + def get_harvester_list(self): + IMPORTER_SERVICE_SERVER = CFG.get_safe('eoi.importer_service.server', 'http://localhost') + IMPORTER_SERVICE_PORT = str(CFG.get_safe('eoi.importer_service.port', 8844)) + self.importer_service_url = ''.join([IMPORTER_SERVICE_SERVER, ':', IMPORTER_SERVICE_PORT]) + #at this point importer service should be up + #get the harvesters list + harvester_get_url = self.importer_service_url+"/service=requestharvester&hfilter=all" + try: + r = requests.get(harvester_get_url,timeout=10) + return r.text + except Exception, e: + #fail because it should have the service running + log.error("check service, as it appears to not be running...%s", e) + return None + + ''' + can get the list of harvesters from th importer service, hits the geonetwork service + ''' + def remove_harvester_list(self,name): + IMPORTER_SERVICE_SERVER = CFG.get_safe('eoi.importer_service.server', 'http://localhost') + IMPORTER_SERVICE_PORT = str(CFG.get_safe('eoi.importer_service.port', 8844)) + self.importer_service_url = ''.join([IMPORTER_SERVICE_SERVER, ':', IMPORTER_SERVICE_PORT]) + #at this point importer service should be up + #get the harvester list + harvester_get_url = self.importer_service_url+"/service=removeharvester&hfilter="+name + try: + r = requests.get(harvester_get_url,timeout=10) + return r.text + except Exception, e: + #fail because it should have the service running + log.error("check service, as it appears to not be running...%s", e) + return None + + def get_harvester_names(self,xml): + #need to strip the encoding + try: + xml = xml.replace('encoding="UTF-8"',""); + parser = etree.XMLParser(target = EchoTarget()) + + root = etree.XML(xml) + d = root.findall("node/site/name") + + for name in d: + etree.XML(etree.tostring(name), parser) + + name_list = parser.target.events + corrected_name = [] + for name in name_list: + n = [n for (n, e) in enumerate(name) if e == "'"] + name_str = name[n[0]+1:n[1]] + corrected_name.append(name_str) + return corrected_name + + except Exception, e: + return [] + + + ''' + checks that havester information is available + can be added too via the importer interface + ''' + @unittest.skipIf( not (CFG.get_safe('eoi.meta.use_eoi_services', False)), 'Skip test services are not loaded') + def test_adding_removing_harvester(self): + + IMPORTER_SERVICE_SERVER = CFG.get_safe('eoi.importer_service.server', 'http://localhost') + IMPORTER_SERVICE_PORT = str(CFG.get_safe('eoi.importer_service.port', 8844)) + self.importer_service_url = ''.join([IMPORTER_SERVICE_SERVER, ':', IMPORTER_SERVICE_PORT]) + + #create url to check service is alive + alive_url = self.importer_service_url+"/service=alive&name=ooi&id=ooi" + #make get request if service is available + try: + r = requests.get(alive_url,timeout=5) + self.assertTrue(r.status_code == 200) + except Exception, e: + #fail because it should have the service running + log.error("check service, as it appears to not be running...%s", e) + #should fail as the service should be running if it has been requested + self.assertTrue(False) + + #current number of harvesters and their names + names_before = self.get_harvester_names(self.get_harvester_list()) + #check that a harvester of a specific name does not exist + + #generate the harvester using something like below. + mock_harvester_create = self.importer_service_url+"/service=createharvester&lcstate=DEPLOYED&rev=1&searchterms=mutibeam,RI&availability=AVAILABLE&externalize=1&persistedversion=1&ogctype=&importxslt=gmiTogmd.xsl&addl=%7B%7D&harvestertype=geoPREST&description=IOOS&datasourceattributes=%7B%7D&visibility=1&connectionparams=%7B%7D&tsupdated=1399474190226&tscreated=1399474190226&institution=Institution(%7B%27website%27:%20%27%27,%20%27phone%27:%20%27%27,%20%27name%27:%20%27%27,%20%27email%27:%20%27%27%7D)&protocoltype=&name="+MOCK_HARVESTER_NAME+"&altids=[%27PRE:EDS_ID2%27]&datasourcetype=geoportal&type=DataSource&id=27aa22dc3f6742d3892a5ec41b0cedb2&protocoltype=http://www.google.com" + try: + r = requests.get(mock_harvester_create,timeout=5) + self.assertTrue(r.status_code == 200) + except Exception, e: + log.error("check service, as it appears to not be running...%s", e) + self.assertTrue(False) + + names_after = self.get_harvester_names(self.get_harvester_list()) + + #overview check to make sure that the number of names is less than it was before the additon + self.assertTrue(len(names_before) < len(names_after)) + #check that the added harvester is in the list + present = False + for name in names_after: + if MOCK_HARVESTER_NAME in name: + present = True + break + + #if valid process + self.assertTrue(present) + + #remove the added harvester + self.remove_harvester_list(MOCK_HARVESTER_NAME) + + #reset the variable + names_after = self.get_harvester_names(self.get_harvester_list()) + + #reset valid variable + present = False + for name in names_after: + if MOCK_HARVESTER_NAME in name: + present = True + break + #checks it has been removed + self.assertFalse(present) + #remove those added during preload + self.remove_added_harvesters() + + + ''' + checks that havester information is available + can be added too via the importer interface + ''' + @unittest.skipIf( not (CFG.get_safe('eoi.meta.use_eoi_services', False)), 'Skip test services are not loaded') + def remove_added_harvesters(self): + names = self.get_harvester_names(self.get_harvester_list()) + expected_list = ['neptune','ioos','ooi'] + for n in names: + if n in expected_list: + log.warn("remoing harvester from geonetwork:"+n) + self.remove_harvester_list(n) + + +class EchoTarget(object): + def __init__(self): + self.events = [] + def start(self, tag, attrib): + pass + def end(self, tag): + pass + def data(self, data): + self.events.append("%r" % data) + pass + def comment(self, text): + pass + def close(self): + return "closed!" \ No newline at end of file