Skip to content

Commit

Permalink
Merge pull request #234 from opendatacube/stacking
Browse files Browse the repository at this point in the history
[ready] Stacking, NetCDF metadata rewrite and NCML utils
  • Loading branch information
omad authored May 17, 2017
2 parents e2c82f1 + 4363a14 commit 7682003
Show file tree
Hide file tree
Showing 9 changed files with 500 additions and 59 deletions.
25 changes: 25 additions & 0 deletions datacube/api/grid_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from itertools import groupby
from collections import OrderedDict
import warnings
import pandas as pd

from ..utils import intersects
from .query import Query, query_group_by
Expand Down Expand Up @@ -93,6 +94,30 @@ def split(self, dim):
indexer[axis] = slice(i, i + 1)
yield self.sources[dim].values[i], self[tuple(indexer)]

def split_by_time(self, freq='A', time_dim='time', **kwargs):
"""
Splits along the `time` dimension, into periods, using pandas offsets, such as:
:
'A': Annual
'Q': Quarter
'M': Month
See: http://pandas.pydata.org/pandas-docs/stable/timeseries.html?highlight=rollback#timeseries-offset-aliases
:param freq: time series frequency
:param time_dim: name of the time dimension
:param kwargs: other keyword arguments passed to ``pandas.period_range``
:return: Generator[tuple(str, Tile)] generator of the key string (eg '1994') and the slice of Tile
"""
start_range = self.sources[time_dim][0].data
end_range = self.sources[time_dim][-1].data

for p in pd.period_range(start=start_range,
end=end_range,
freq=freq,
**kwargs):
sources_slice = self.sources.loc[{time_dim: slice(p.start_time, p.end_time)}]
yield str(p), Tile(sources=sources_slice, geobox=self.geobox)

def __str__(self):
return "Tile<sources={!r},\n\tgeobox={!r}>".format(self.sources, self.geobox)

Expand Down
70 changes: 68 additions & 2 deletions datacube/ui/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import cachetools
import functools
import itertools
import re
from pathlib import Path

import pandas as pd

try:
import cPickle as pickle
except ImportError:
Expand Down Expand Up @@ -108,6 +111,30 @@ def load_tasks(taskfile):
)


def _cell_list_from_file(filename):
cell_matcher = re.compile(r'(\-?\d+)(?:\s*(?:,|_|\s)\s*)(\-?\d+)')
with open(filename) as cell_file:
for line in cell_file:
match = cell_matcher.match(line)
if match:
yield tuple(int(i) for i in match.groups())


def cell_list_to_file(filename, cell_list):
with open('cell_index.txt', 'w') as cell_file:
for cell in cell_list:
cell_file.write('{0},{1}\n'.format(*cell))


def validate_cell_list(ctx, param, value):
try:
if value is None:
return None
return list(_cell_list_from_file(value))
except ValueError:
raise click.BadParameter('cell_index_list must be a file with lines in the form "14,-11"')


def validate_cell_index(ctx, param, value):
try:
if value is None:
Expand All @@ -121,13 +148,52 @@ def validate_year(ctx, param, value):
try:
if value is None:
return None
years = [int(y) for y in value.split('-', 2)]
return datetime(year=years[0], month=1, day=1), datetime(year=years[-1] + 1, month=1, day=1)
years = [pd.Period(y) for y in value.split('-', 2)]
return years[0].start_time.to_datetime(), years[-1].end_time.to_datetime()
except ValueError:
raise click.BadParameter('year must be specified as a single year (eg 1996) '
'or as an inclusive range (eg 1996-2001)')


def break_query_into_years(time_query, **kwargs):
if time_query is None:
return kwargs
return [{'time': time_range}.update(kwargs) for time_range in year_splitter(*time_query)]


def year_splitter(start, end):
"""
Produces a list of time ranges based that represent each year in the range.
`year_splitter('1992', '1993')` returns:
::
[('1992-01-01 00:00:00', '1992-12-31 23:59:59.9999999'),
('1993-01-01 00:00:00', '1993-12-31 23:59:59.9999999')]
:param str start: start year
:param str end: end year
:return Generator[tuple(str, str)]: strings representing the ranges
"""
start_ts = pd.Timestamp(start)
end_ts = pd.Timestamp(end)
for p in pd.period_range(start=start_ts, end=end_ts, freq='A'):
yield str(p.start_time), str(p.end_time)


#: pylint: disable=invalid-name
cell_index_option = click.option('--cell-index', 'cell_index',
help='Limit the process to a particular cell (e.g. 14,-11)',
callback=validate_cell_index, default=None)
#: pylint: disable=invalid-name
cell_index_list_option = click.option('--cell-index-list', 'cell_index_list',
help='Limit the process to a file of cells indexes (e.g. 14,-11)',
callback=validate_cell_list, default=None)
#: pylint: disable=invalid-name
year_option = click.option('--year', 'time', help='Limit the process to a particular year',
callback=validate_year)


def task_app(make_config, make_tasks):
"""
Create a `Task App` from a function
Expand Down
74 changes: 55 additions & 19 deletions datacube_apps/ncml.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,38 @@ def get_filename(config, cell_index, year=None):
return file_path_template.format(tile_index=cell_index, start_time=year)


def make_ncml_tasks(index, config, cell_index=None, year=None, **kwargs):
def make_ncml_tasks(index, config, cell_index=None, year=None, cell_index_list=None, **kwargs):
product = config['product']

query = {}
if year is not None:
query['time'] = datetime(year=year, month=1, day=1), datetime(year=year + 1, month=1, day=1)

config['nested_years'] = kwargs.get('nested_years', [])

gw = datacube.api.GridWorkflow(index=index, product=product.name)
cells = gw.list_cells(product=product.name, cell_index=cell_index, **query)
for (cell_index, tile) in cells.items():
output_filename = get_filename(config, cell_index, year)
yield dict(tile=tile,
cell_index=cell_index,
output_filename=output_filename)

if cell_index_list is None:
if cell_index is not None:
cell_index_list = [cell_index]
else:
cell_index_list = []

for cell_index in cell_index_list:
cells = gw.list_cells(product=product.name, cell_index=cell_index, **query)
for (cell_index, tile) in cells.items():
output_filename = get_filename(config, cell_index, year)
yield dict(tile=tile,
cell_index=cell_index,
output_filename=output_filename)


def make_ncml_config(index, config, export_path=None, **query):
def make_ncml_config(index, config, export_path=None, nested_years=None, **query):
config['product'] = index.products.get_by_name(config['output_type'])

config['nested_years'] = nested_years if nested_years is not None else []

if export_path is not None:
config['location'] = export_path

if not os.access(config['location'], os.W_OK):
_LOG.warning('Current user appears not have write access output location: %s', config['location'])
return config
Expand Down Expand Up @@ -100,11 +111,13 @@ def get_sources_filepath(sources):

def write_ncml_file(ncml_filename, file_locations, header_attrs):
filename = Path(ncml_filename)
if filename.exists():
raise RuntimeError('NCML already exists: %s' % filename)
temp_filename = Path().joinpath(*filename.parts[:-1]) / '.tmp' / filename.parts[-1]

if temp_filename.exists():
temp_filename.unlink()

try:
filename.parent.mkdir(parents=True)
temp_filename.parent.mkdir(parents=True)
except OSError:
pass

Expand All @@ -114,7 +127,7 @@ def write_ncml_file(ncml_filename, file_locations, header_attrs):
<remove name="dataset_nchar" type="dimension" />
</netcdf>"""

with open(ncml_filename, 'w') as ncml_file:
with open(str(temp_filename), 'w') as ncml_file:
ncml_file.write('<netcdf xmlns="http://www.unidata.ucar.edu/namespaces/netcdf/ncml-2.2">\n')

for key, value in header_attrs.items():
Expand All @@ -128,11 +141,10 @@ def write_ncml_file(ncml_filename, file_locations, header_attrs):
ncml_file.write(' </aggregation>\n')
ncml_file.write('</netcdf>\n')

if filename.exists():
filename.unlink()

#: pylint: disable=invalid-name
cell_index_option = click.option('--cell-index', 'cell_index',
help='Limit to a particular cell (e.g. 14,-11)',
callback=task_app.validate_cell_index, default=None)
temp_filename.rename(filename)


@click.group(name=APP_NAME, help='NCML creation utility')
Expand All @@ -146,9 +158,16 @@ def ncml_app():
datacube.ui.click.config_option,
datacube.ui.click.pass_index(app_name=APP_NAME),
datacube.ui.click.logfile_option,
cell_index_option,
task_app.cell_index_option,
task_app.cell_index_list_option,
task_app.queue_size_option,
task_app.load_tasks_option,
task_app.save_tasks_option,
datacube.ui.click.executor_cli_options,
click.option('--export-path', 'export_path',
help='Write the stacked files to an external location instead of the location in the app config',
default=None,
type=click.Path(exists=True, writable=True, file_okay=False)),
)


Expand All @@ -157,6 +176,10 @@ def ncml_app():
@click.argument('app_config')
@task_app.task_app(make_config=make_ncml_config, make_tasks=make_ncml_tasks)
def full(index, config, tasks, executor, queue_size, **kwargs):
"""Create ncml files for the full time depth of the product
e.g. datacube-ncml full <app_config_yaml>
"""
click.echo('Starting datacube ncml utility...')

task_func = partial(do_ncml_task, config)
Expand All @@ -169,6 +192,13 @@ def full(index, config, tasks, executor, queue_size, **kwargs):
@click.argument('nested_years', nargs=-1, type=click.INT)
@task_app.task_app(make_config=make_ncml_config, make_tasks=make_ncml_tasks)
def nest(index, config, tasks, executor, queue_size, **kwargs):
"""Create ncml files for the full time, with nested ncml files covering the given years
e.g. datacube-ncml nest <app_config_yaml> 2016 2017
This will refer to the actual files (hopefully stacked), and make ncml files for the given (ie unstacked) years.
Use the `update` command when new data is added to a year, without having to rerun for the entire time depth.
"""
click.echo('Starting datacube ncml utility...')

task_func = partial(do_ncml_task, config)
Expand All @@ -181,6 +211,12 @@ def nest(index, config, tasks, executor, queue_size, **kwargs):
@click.argument('year', type=click.INT)
@task_app.task_app(make_config=make_ncml_config, make_tasks=make_ncml_tasks)
def update(index, config, tasks, executor, queue_size, **kwargs):
"""Update a single year ncml file
e.g datacube-ncml <app_config_yaml> 1996
This can be used to update an existing ncml file created with `nest` when new data is added.
"""
click.echo('Starting datacube ncml utility...')

task_func = partial(do_ncml_task, config)
Expand Down
3 changes: 2 additions & 1 deletion datacube_apps/stacker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"""
from __future__ import absolute_import
from .stacker import main
from .fixer import fixer as fixer_main

__all__ = ['main']
__all__ = ['main', 'fixer_main']
Loading

0 comments on commit 7682003

Please sign in to comment.