Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M057: Integrates preloading of External DataSources as meta data resouces for use in Geonetwork #2120

Merged
merged 2 commits into from
May 29, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ion/processes/bootstrap/ion_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
### the URL below should point to a COPY of the master google spreadsheet that works with this version of the loader
#Apr15 TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0ArFEMmslwP1ddHY3Zmlza0h5LXZINmpXRXNvRXBkdEE&output=xls"
#Apr21 TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0AgjFgozf2vG6dHRFS0x4eWdRM21vMHdEMWZTeFFNTVE&output=xls"
TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0AgjFgozf2vG6dGZ6TXdQZ2VTT0phdXMyU0JydmE2cHc&output=xls"
TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0AsF-1tKj9wC_dFpoTzZWeXBMeGNhV1lhZjVlN3lTNWc&output=xls"

### while working on changes to the google doc, use this to run test_loader.py against the master spreadsheet
#TESTED_DOC=MASTER_DOC
Expand Down Expand Up @@ -154,6 +154,7 @@
'PlatformAgentInstance',
'InstrumentAgent',
'InstrumentDevice',
'DataSource',
'ExternalDataProvider',
'ExternalDatasetModel',
'ExternalDataset',
Expand Down Expand Up @@ -3047,6 +3048,9 @@ def _load_ExternalDatasetModel(self, row):
'data_acquisition_management', 'create_external_dataset_model',
support_bulk=True)

def _load_DataSource(self, row):
self._basic_resource_create(row, 'DataSource', 'ds/','data_acquisition_management','create_data_source',support_bulk=True)

def _load_ExternalDataset(self, row):
contacts = self._get_contacts(row, field='contact_id')
if len(contacts) > 1:
Expand Down
1 change: 1 addition & 0 deletions ion/processes/bootstrap/plugins/bootstrap_eoi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def on_initial_bootstrap(self, process, config, **kwargs):
EOI BootstrapPlugin

Resets the geoserver datastore...
Performs inital parsing of the eoi datasources
"""
using_eoi_services = config.get_safe('eoi.meta.use_eoi_services', False)
if using_eoi_services:
Expand Down
14 changes: 14 additions & 0 deletions ion/processes/bootstrap/plugins/bootstrap_process_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ def on_initial_bootstrap(self, process, config, **kwargs):
self.notification_worker(process,config)
self.registration_worker(process,config)
self.pydap_server(process,config)
self.eoi_services(process,config)

def eoi_services(self,process,config):
eoi_module = config.get_safe('bootstrap.processes.registration.module', 'ion.processes.data.registration.eoi_registration_process')
eoi_class = config.get_safe('bootstrap.processes.registration.class', 'EOIRegistrationProcess')

process_definition = ProcessDefinition(
name = 'eoi_server',
description = 'Process for eoi data sources')
process_definition.executable['module'] = eoi_module
process_definition.executable['class'] = eoi_class

self._create_and_launch(process_definition)


def pydap_server(self, process, config):
pydap_module = config.get_safe('bootstrap.processes.pydap.module', 'ion.processes.data.externalization.lightweight_pydap')
Expand Down
92 changes: 92 additions & 0 deletions ion/processes/data/registration/eoi_registration_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/env python
from pyon.ion.process import SimpleProcess
from pyon.ion.event import EventSubscriber
from pyon.public import OT, RT
import requests
from pyon.public import CFG
from pyon.util.log import log
from pyon.util.breakpoint import breakpoint

# GeoNetwork
# TODO should maybe go in the yml file
REQUEST_HARVESTER = "requestharvester"
CREATE_HARVESTER = "createharvester"
UPDATE_HARVESTER = "updateharvester"
REMOVE_HARVESTER = "removeharvester"
RESET_HARVESTER = "resetharvester"
START_HARVESTER = "startharvester"
STOP_HARVESTER = "stopharvester"
RUN_HARVESTER = "runharvester"

class EOIRegistrationProcess(SimpleProcess):

def on_start(self):
self.data_source_subscriber = EventSubscriber(event_type=OT.ResourceModifiedEvent,
origin_type=RT.DataSource,
callback=self._register_data_source)
self.provider_subscriber = EventSubscriber(event_type=OT.ResourceModifiedEvent,
origin_type=RT.ExternalDataProvider,
callback=self._register_provider)
self.data_source_subscriber.start()
self.provider_subscriber.start()

self.rr = self.container.resource_registry

self.using_eoi_services = CFG.get_safe('eoi.meta.use_eoi_services', False)
self.server = CFG.get_safe('eoi.importer_service.server', "localhost")+":"+str(CFG.get_safe('eoi.importer_service.port', 8844))

log.info("Using geoservices="+str(self.using_eoi_services))
if not self.using_eoi_services:
log.warn("not using geoservices...")

self.importer_service_available = self.check_for_importer_service()
if not self.importer_service_available:
log.warn("not using importer service...")

def check_for_importer_service(self):
'''
only gets run on start, used to identify if importer service is available
'''
try:
r = requests.get(self.server+'/service=alive&name=ooi&id=ooi')
log.info("importer service available, status code: %s", str(r.status_code))
#alive service returned ok
if r.status_code == 200:
return True
else:
return False
except Exception as e:
#SERVICE IS REALLY NOT AVAILABLE
log.warn("importer service is really not available...%s", e)
return False


def _register_data_source(self, event, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should place a general try-except Exception block around the main threads. These are long running and you cannot afford to lose them to some random exception

'''
used to create a harvester
'''
if self.importer_service_available:
obj = self.rr.read(event.origin)
data_fields = []
for attrname, value in vars(obj).iteritems():
#generate th param list to pass to importer service using field names
if attrname is not "contact":
f = attrname.replace("_", "")+"="+str(obj[attrname])
data_fields.append(f)

param_list = '&'.join(data_fields)

request_string = self.server+'/service='+CREATE_HARVESTER+"&"+param_list
r = requests.get(request_string)


def _register_provider(self, event, *args, **kwargs):
if self.importer_service_available:
#print "provider id:", event.origin
pass


def on_quit(self):
self.data_source_subscriber.stop()
self.provider_subscriber.stop()

153 changes: 85 additions & 68 deletions ion/services/eoi/table_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
REAL = "real"
INT = "int"
TIMEDATE = "timestamp"
REQUIRED_PARAMS = ["lat","lon"]


class ResourceParser(object):
Expand Down Expand Up @@ -171,8 +172,7 @@ def create_single_resource(self, new_resource_id, param_dict):
coverage_path = self._get_coverage_path(new_resource_id)

#generate table from params and id
[success, prim_types] = self.generate_sql_table(new_resource_id, param_dict, relevant, coverage_path)

[success, prim_types] = self.generate_sql_table(new_resource_id, param_dict, relevant, coverage_path)
if success:
#generate geoserver layer
self.send_geonode_request(self.addlayer, new_resource_id, prim_types)
Expand Down Expand Up @@ -214,80 +214,97 @@ def get_value_encoding(self, name, value_encoding):

return encoding_string, prim_type

'''
verifies that the required params are in the resources
'''
def required_fields_satisfied(self,param_list):
#should always contain atleast 3 params
if (len(param_list)>3):
return set(REQUIRED_PARAMS).issubset(set(param_list))
else:
return False

def generate_sql_table(self, dataset_id, params, relevant, coverage_path):
"""
Generates Foreign data table for used with postgres
"""
#check table exists
if not self.does_table_exist(dataset_id):
valid_types = {}
create_table_string = 'create foreign table "%s" (' % dataset_id

#loop through the params
encodings = []
for param in relevant:
#get the information
data_item = params[param]
desc = data_item[1]['description']
ooi_short_name = data_item[1]['ooi_short_name']
name = data_item[1]['name']
disp_name = data_item[1]['display_name']
internal_name = data_item[1]['internal_name']
cm_type = data_item[1]['param_type']['cm_type']
units = ""
try:
units = data_item[1]['uom']
except Exception as e:
if DEBUG:
log.debug("no units available...%s", e.message)
create_table_string = 'create foreign table "%s" (' % dataset_id
log.debug("relevant:"+relevant+" valid?:"+self.required_fields_satisfied(relevant))
if self.required_fields_satisfied(relevant):
#loop through the params
encodings = []

for param in relevant:
#get the information
data_item = params[param]
desc = data_item[1]['description']
ooi_short_name = data_item[1]['ooi_short_name']
name = data_item[1]['name']
disp_name = data_item[1]['display_name']
internal_name = data_item[1]['internal_name']
cm_type = data_item[1]['param_type']['cm_type']
units = ""
try:
units = data_item[1]['uom']
except Exception as e:
if DEBUG:
log.debug("no units available...%s", e.message)

value_encoding = data_item[1]['param_type']['_value_encoding']
fill_value = data_item[1]['param_type']['_fill_value']
std_name = data_item[1]['standard_name']

#only use things that have valid value
if len(name) > 0: #and (len(desc)>0) and (len(units)>0) and (value_encoding is not None)):
if DEBUG:
log.debug("-------processed-------")
log.debug(str(ooi_short_name))
log.debug(str(desc))
log.debug(str(name))
log.debug(str(disp_name))
log.debug(str(units))
log.debug(str(internal_name))
log.debug(str(value_encoding))
log.debug(str(cm_type[1]))

if cm_type[1] == "ArrayType":
#ignore array types
pass
else:
[encoding, prim_type] = self.get_value_encoding(name, value_encoding)
if encoding is not None:
encodings.append(encoding)
valid_types[name] = prim_type

pass

create_table_string += ','.join(encodings)
log.debug("coverage path:"+coverage_path)
create_table_string = self.add_server_info(create_table_string, coverage_path, dataset_id)

value_encoding = data_item[1]['param_type']['_value_encoding']
fill_value = data_item[1]['param_type']['_fill_value']
std_name = data_item[1]['standard_name']

#only use things that have valid value
if len(name) > 0: #and (len(desc)>0) and (len(units)>0) and (value_encoding is not None)):
if DEBUG:
log.debug("-------processed-------")
log.debug(str(ooi_short_name))
log.debug(str(desc))
log.debug(str(name))
log.debug(str(disp_name))
log.debug(str(units))
log.debug(str(internal_name))
log.debug(str(value_encoding))
log.debug(str(cm_type[1]))

if cm_type[1] == "ArrayType":
#ignore array types
pass
else:
[encoding, prim_type] = self.get_value_encoding(name, value_encoding)
if encoding is not None:
encodings.append(encoding)
valid_types[name] = prim_type

pass

create_table_string += ','.join(encodings)
log.debug("coverage path:"+coverage_path)
create_table_string = self.add_server_info(create_table_string, coverage_path, dataset_id)

if DEBUG:
log.debug('\n%s', create_table_string)

try:
self.cur.execute(create_table_string)
self.con.commit()
#should always be lat and lon
self.cur.execute(self.generate_table_view(dataset_id, self.latitude, self.longitude))
self.con.commit()
return self.does_table_exist(dataset_id), valid_types

except Exception as e:
#error setting up connection
log.debug('Error %s', e)
raise
if DEBUG:
log.debug('\n%s', create_table_string)

#check that the dataproduct has all the required fields

try:
self.cur.execute(create_table_string)
self.con.commit()
#should always be lat and lon
self.cur.execute(self.generate_table_view(dataset_id, self.latitude, self.longitude))
self.con.commit()
return self.does_table_exist(dataset_id), valid_types

except Exception as e:
#error setting up connection
log.debug('Error %s', e)
raise
else:
log.warn('resource skipped, it does not contain all of the required params:')
return False

else:
if DEBUG:
Expand Down
Loading