Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert pandas dataframes to netCDF DSG files #1074

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/metpy/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# Copyright (c) 2015,2016,2018 MetPy Developers.
# Copyright (c) 2015,2016,2018,2019 MetPy Developers.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
"""Classes for reading various file formats.
"""Classes for reading and writing various file formats.

These classes are written to take both file names (for local files) or file-like objects;
this allows reading files that are already in memory (using :class:`python:io.StringIO`)
or remote files (using :func:`~python:urllib.request.urlopen`).
The gini and nexrad classes are written to take both file names (for local files)
or file-like objects; this allows reading files that are already in memory (using
:class:`python:io.StringIO`) or remote files (using :func:`~python:urllib.request.urlopen`).

The `dataframe_to_netcdf` function take a pandas dataframe and writes a netCDF file (in DSG
format if applicable).
"""

from .gini import * # noqa: F403
from .nexrad import * # noqa: F403
from .pandas_to_netcdf import * # noqa: F403

__all__ = gini.__all__[:] # pylint: disable=undefined-variable
__all__.extend(nexrad.__all__) # pylint: disable=undefined-variable
__all__.extend(pandas_to_netcdf.__all__) # pylint: disable=undefined-variable
221 changes: 221 additions & 0 deletions src/metpy/io/pandas_to_netcdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
# Copyright (c) 2019 MetPy Developers.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
"""Support reading a pandas dataframe to a DSG netCDF."""

import logging
import os

from numpy import arange
import pandas as pd
import xarray as xr

from ..package_tools import Exporter

exporter = Exporter(globals())

log = logging.getLogger(__name__)


@exporter.export
def dataframe_to_netcdf(df, mode, sampling_var, sampling_data_vars, path_to_save,
netcdf_format=None, column_units=None, standard_names=None,
long_names=None, dataset_type=None):
r"""Take a Pandas DataFrame and convert it to a netCDF file.

If given a Pandas DataFrame, this function will first convert
it to a xarray Dataset, attach attributes and metadata to it as
provided by the user, and then save it as a CF-compliant discrete
sampling geometry (DGS) netCDF file. Assumes each row of the DataFrame
is a unique observation

This function is ideal for point data, such as station observations,
or for trajectory or profile data, which is discretely sampled at
individual points

Parameters
----------
df : `pandas.DataFrame`
Point data in pandas dataframe.

mode : str
Specify whether to write ('w') a new netCDF file or append ('a') to an existing file.
If 'w' is specified and the `path_to_save` already exists, the file will be
overwritten.

sampling_var : str
Column name that is the sampling dimension: for surface observations,
this is the column that contains the station identifier/name

sampling_data_vars : list
List of all variables associated with the sampling variable that do not
vary with time, such as latitude, longitude, and elevation for
surface observations

path_to_save : str
Path, including filename, for where to save netCDF file.

netcdf_format : str, optional

column_units : dict, optional
Dictionary of units to attach to columns of the dataframe. Overrides
the units attribute if it is attached to the dataframe.

standard_names : dict, optional
Dictionary of variable descriptions that are CF-compliant

long_names : dict, optional
Dictionary of longer variable descriptions that provide more detail
than standard_names

dataset_type: str, optional
Type of dataset to be converted. Options are 'timeSeries', 'profile',
or 'trajectory'. While optional, this variable should be declared to create
a CF-compliant DSG netCDF file.

Returns
-------
NetCDF file saved to `path_to_save`.

Notes
-----
If append mode is used, all metadata will be preserved, but will be overwritten by
user input.

"""
if mode == 'w':
_write_to_netcdf(df, sampling_var, sampling_data_vars, path_to_save,
netcdf_format, column_units, standard_names, long_names,
dataset_type)
elif mode == 'a':
_append_to_netcdf(df, sampling_var, sampling_data_vars, path_to_save,
netcdf_format, column_units, standard_names, long_names,
dataset_type)
else:
raise ValueError('Mode must either be "w" or "a".')


def _write_to_netcdf(df, sampling_var, sampling_data_vars, path_to_save, netcdf_format,
column_units, standard_names, long_names, dataset_type):
"""Write Pandas DataFrame to netCDF file.

This will overwrite any existing file at `path_to_save`.
"""
# Verify_integrity must be true in order for conversion to netCDF to work
# Return a TypeError if not provided a Pandas DataFrame
try:
# Create the dimensions for use later in netCDF file
samplingindex = df.groupby([sampling_var], sort=False).ngroup()
obs = arange(0, len(df))
df.insert(0, 'samplingIndex', samplingindex)
df.insert(1, 'observations', obs)

# Handle the sampling location specific data
sampling_data = df[sampling_data_vars]
samples = sampling_data.groupby([sampling_var], sort=False).ngroup()
sampling_data.insert(0, 'samples', samples)
sampling_data = sampling_data.groupby('samples').first()
dataset_samples = xr.Dataset.from_dataframe(sampling_data)

# Create the dataset for the variables of each observation
df = df.drop(sampling_data_vars, axis=1)
df = df.set_index(['observations'], verify_integrity=True)
dataset_var = xr.Dataset.from_dataframe(df)

# Merge the two datasets together
dataset_final = xr.merge([dataset_samples, dataset_var], compat='no_conflicts')

except (AttributeError, ValueError, TypeError):
raise TypeError('A pandas dataframe was not provided')

# Attach variable-specific metadata
_assign_metadata(dataset_final, column_units, standard_names, long_names)

# Attach dataset-specific metadata
if dataset_type:
dataset_final.attrs['featureType'] = dataset_type
dataset_final[sampling_var].attrs['cf_role'] = dataset_type.lower() + '_id'
else:
log.warning('No dataset type provided - netCDF will not have appropriate metadata'
'for a DSG dataset.')
dataset_final['samplingIndex'].attrs['instance_dimension'] = 'samples'

# Remove any existing file
if os.path.exists(str(path_to_save)):
os.remove(str(path_to_save))

# Check if netCDF4 is installed to see how many unlimited dimensions we can use
# Need conditional import for checking due to Python 2
try:
from importlib.util import find_spec
check_netcdf4 = find_spec('netCDF4')
except ImportError:
from imp import find_module
check_netcdf4 = find_module('netCDF4')

if check_netcdf4 is not None:
unlimited_dimensions = ['samples', 'observations']
else:
# Due to xarray's fallback to scipy if netCDF4-python is not installed
# only one dimension can be unlimited. This may cause issues for users
log.warning('NetCDF4 not installed - saving as a netCDF3 file with only the'
'observations dimension as unlimited. If netCDF4 or multiple'
'dimensions are desired, run `pip install netCDF4`')
unlimited_dimensions = ['observations']

# Convert to netCDF
dataset_final.to_netcdf(path=str(path_to_save), mode='w', format=netcdf_format,
unlimited_dims=unlimited_dimensions, compute=True)


def _append_to_netcdf(df, sampling_var, sampling_data_vars, path_to_save,
netcdf_format, column_units, standard_names, long_names, dataset_type):
"""Append to existing netCDF file."""
ds = xr.open_dataset(str(path_to_save))
df_old = (ds.to_dataframe().reset_index()
.drop(columns=['samplingIndex', 'observations', 'samples']))
df_new = pd.concat([df_old, df], sort=False).reset_index(drop=True) # Pandas dependency

# Assign metadata here
if dataset_type is None and 'featureType' in ds.attrs:
dataset_type = ds.attrs['featureType']
append_column_units = {}
append_standard_names = {}
append_long_names = {}
for var_name, da in ds.data_vars.items():
if 'units' in da.attrs:
append_column_units[var_name] = da.attrs['units']
if 'standard_name' in da.attrs:
append_standard_names[var_name] = da.attrs['standard_name']
if 'long_name' in da.attrs:
append_long_names[var_name] = da.attrs['long_name']
if column_units is not None:
append_column_units.update(column_units)
if standard_names is not None:
append_standard_names.update(standard_names)
if long_names is not None:
append_long_names.update(long_names)

_write_to_netcdf(df_new, sampling_var, sampling_data_vars, path_to_save,
netcdf_format, append_column_units, append_standard_names,
append_long_names, dataset_type)


def _assign_metadata(dataset, units_dict, standard_names_dict, long_names_dict):
if units_dict is not None:
for var in dataset.variables:
if var in units_dict:
dataset[var].attrs['units'] = units_dict[var]
if standard_names_dict is not None:
for var in dataset.variables:
if var in standard_names_dict:
dataset[var].attrs['standard_name'] = standard_names_dict[var]
if long_names_dict is not None:
final_long_names = {}
final_long_names['samples'] = 'Sampling dimension'
final_long_names['observations'] = 'Observation dimension'
final_long_names['samplingIndex'] = 'Index of station for this observation'
final_long_names.update(long_names_dict)
for var in dataset.variables:
if var in final_long_names:
dataset[var].attrs['long_name'] = final_long_names[var]
142 changes: 142 additions & 0 deletions tests/io/test_pandas_to_netcdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright (c) 2019 MetPy Developers.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
"""Test the `pandas_to_netcdf` module."""

import logging
import os

import numpy as np
import pandas as pd
import pytest
import xarray as xr

from metpy.cbook import get_test_data
from metpy.io import dataframe_to_netcdf

# Turn off the warnings for tests
logging.getLogger('metpy.io.pandas_to_netcdf').setLevel(logging.CRITICAL)


@pytest.fixture
def test_df():
"""Create generic dataframe for testing."""
return pd.DataFrame({
'temperature': pd.Series([1, 2, 2, 3]), 'pressure': pd.Series([1, 2, 2, 3]),
'latitude': pd.Series([4, 5, 6, 7]), 'longitude': pd.Series([1, 2, 3, 4]),
'station_id': pd.Series(['KFNL', 'KDEN', 'KVPZ', 'KORD'])})


@pytest.fixture
def test_df2():
"""Create generic dataframe for appending."""
return pd.DataFrame({
'temperature': pd.Series([20]), 'pressure': pd.Series([1010]),
'latitude': pd.Series([40]), 'longitude': pd.Series([-65]),
'station_id': pd.Series(['KLGA'])})


def test_dataframe_to_netcdf_basic(tmpdir):
"""Test dataframe conversion to netcdf."""
df = pd.read_csv(get_test_data('station_data.txt'), usecols=[0, 1, 2, 3, 4, 5])
df = df.rename(columns={'latitude[unit="degrees_north"]': 'latitude',
'longitude[unit="degrees_east"]': 'longitude',
'air_pressure_at_sea_level[unit="hectoPascal"]':
'mean_sea_level_pressure',
'air_temperature[unit="Celsius"]': 'temperature'})
dataframe_to_netcdf(df, mode='w', path_to_save=str(tmpdir) + '/test.nc',
sampling_var='station', sampling_data_vars=['station', 'latitude',
'longitude'])
assert os.path.exists(str(tmpdir) + '/test.nc')
data = xr.open_dataset(str(tmpdir) + '/test.nc')
assert np.max(data['temperature']) == 27


def test_dataframe_to_netcdf_units(tmpdir):
"""Test units attached via a dictionary."""
df = pd.read_csv(get_test_data('station_data.txt'), usecols=[0, 1, 2, 3, 4, 5])
df = df.rename(columns={'latitude[unit="degrees_north"]': 'latitude',
'longitude[unit="degrees_east"]': 'longitude',
'air_pressure_at_sea_level[unit="hectoPascal"]':
'mean_sea_level_pressure',
'air_temperature[unit="Celsius"]': 'temperature'})
col_units = {'latitude': 'degrees', 'longitude': 'degrees', 'temperature': 'degC',
'mean_sea_level_pressure': 'hPa'}
dataframe_to_netcdf(df, mode='w', path_to_save=str(tmpdir) + '/test.nc',
sampling_var='station',
sampling_data_vars=['station', 'latitude', 'longitude'],
column_units=col_units, dataset_type='timeSeries')
data = xr.open_dataset(str(tmpdir) + '/test.nc')
assert data['station'].attrs['cf_role'] == 'timeseries_id'
assert data['temperature'].attrs['units'] == 'degC'


def test_dataframe_to_netcdf_names(test_df, tmpdir):
"""Test attachment of standard names via a dictionary."""
long_names = {'temperature': '2-meter air temperature',
'pressure': 'Mean sea-level air pressure', 'latitude': 'Station latitude',
'longitude': 'Station longitude', 'station_id': 'Station identifier'}
standard_names = {'temperature': 'air_temperature',
'pressure': 'air_pressure_at_mean_sea_level', 'latitude': 'latitude',
'longitude': 'longitude', 'station_id': 'platform_id'}
dataframe_to_netcdf(test_df, mode='w', path_to_save=str(tmpdir) + '/test.nc',
sampling_var='station_id',
sampling_data_vars=['station_id', 'latitude', 'longitude'],
standard_names=standard_names, long_names=long_names)
data = xr.open_dataset(str(tmpdir) + '/test.nc')
assert data['temperature'].attrs['standard_name'] == 'air_temperature'
assert data['station_id'].attrs['long_name'] == 'Station identifier'


def test_no_dataframe(tmpdir):
"""Test error message if Pandas DataFrame is not provided."""
array = np.arange(0, 10)
with pytest.raises(TypeError, match='A pandas dataframe was not provided'):
dataframe_to_netcdf(array, mode='w', path_to_save=str(tmpdir) + '/test.nc',
sampling_var=None, sampling_data_vars=None)


def test_invalid_mode_option(test_df, tmpdir):
"""Test error message if an incorrect file mode is specified."""
with pytest.raises(ValueError, match='Mode must either be "w" or "a".'):
dataframe_to_netcdf(test_df, mode='r', path_to_save=str(tmpdir) + '/test.nc',
sampling_var='station_id',
sampling_data_vars=['station_id', 'latitude', 'longitude'])


def test_append_basic(test_df, test_df2, tmpdir):
"""Test appending to an existing file."""
dataframe_to_netcdf(test_df, mode='w', path_to_save=str(tmpdir) + '/test.nc',
sampling_var='station_id',
sampling_data_vars=['station_id', 'latitude', 'longitude'])
dataframe_to_netcdf(test_df2, mode='a', path_to_save=str(tmpdir) + '/test.nc',
sampling_var='station_id',
sampling_data_vars=['station_id', 'latitude', 'longitude'])
data = xr.open_dataset(str(tmpdir) + '/test.nc')
assert 'KLGA' in data['station_id']
assert data.dims['samples'] == 5
assert data.dims['observations'] == 17


def test_append_attributes(test_df, test_df2, tmpdir):
"""Test appending dataset with existing attributes."""
units = {'temperature': 'degC', 'pressure': 'hPa', 'latitude': 'degrees',
'longitude': 'degrees'}
long_names = {'temperature': '2-meter air temperature',
'pressure': 'Mean sea-level air pressure', 'latitude': 'Station latitude',
'longitude': 'Station longitude', 'station_id': 'Station identifier'}
standard_names = {'temperature': 'air_temperature',
'pressure': 'air_pressure_at_mean_sea_level', 'latitude': 'latitude',
'longitude': 'longitude', 'station_id': 'platform_id'}
dataframe_to_netcdf(test_df, mode='w', path_to_save=str(tmpdir) + '/test.nc',
sampling_var='station_id',
sampling_data_vars=['station_id', 'latitude', 'longitude'],
column_units=units, standard_names=standard_names,
long_names=long_names, dataset_type='timeSeries')
dataframe_to_netcdf(test_df2, mode='a', path_to_save=str(tmpdir) + '/test.nc',
sampling_var='station_id',
sampling_data_vars=['station_id', 'latitude', 'longitude'])
data = xr.open_dataset(str(tmpdir) + '/test.nc')
assert data.temperature.attrs['units'] == 'degC'
assert data.attrs['featureType'] == 'timeSeries'
assert data.station_id.attrs['cf_role'] == 'timeseries_id'