#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2015-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Dataset objects."""
import logging
import numbers
from collections import namedtuple
from collections.abc import Collection
from datetime import datetime
logger = logging.getLogger(__name__)
[docs]def average_datetimes(dt_list):
"""Average a series of datetime objects.
.. note::
This function assumes all datetime objects are naive and in the same
time zone (UTC).
Args:
dt_list (iterable): Datetime objects to average
Returns: Average datetime as a datetime object
"""
total = [datetime.timestamp(dt) for dt in dt_list]
return datetime.fromtimestamp(sum(total) / len(total))
def _share_metadata_key(k, values, average_times):
"""Combine metadata. Helper for combine_metadata, decide if key is shared."""
any_arrays = any([hasattr(val, "__array__") for val in values])
# in the real world, the `ancillary_variables` attribute may be
# List[xarray.DataArray], this means our values are now
# List[List[xarray.DataArray]].
# note that this list_of_arrays check is also true for any
# higher-dimensional ndarray, but we only use this check after we have
# checked any_arrays so this false positive should have no impact
list_of_arrays = any(
[isinstance(val, Collection) and len(val) > 0 and
all([hasattr(subval, "__array__")
for subval in val])
for val in values])
if any_arrays:
return _share_metadata_key_array(values)
elif list_of_arrays:
return _share_metadata_key_list_arrays(values)
elif 'time' in k and isinstance(values[0], datetime) and average_times:
return True
elif all(val == values[0] for val in values[1:]):
return True
return False
def _share_metadata_key_array(values):
"""Combine metadata. Helper for combine_metadata, check object identity in list of arrays."""
for val in values[1:]:
if val is not values[0]:
return False
return True
def _share_metadata_key_list_arrays(values):
"""Combine metadata. Helper for combine_metadata, check object identity in list of list of arrays."""
for val in values[1:]:
for arr, ref in zip(val, values[0]):
if arr is not ref:
return False
return True
DATASET_KEYS = ("name", "wavelength", "resolution", "polarization",
"calibration", "level", "modifiers")
DatasetID = namedtuple("DatasetID", " ".join(DATASET_KEYS))
DatasetID.__new__.__defaults__ = (None, None, None, None, None, None, tuple())
[docs]class DatasetID(DatasetID):
"""Identifier for all `Dataset` objects.
DatasetID is a namedtuple that holds identifying and classifying
information about a Dataset. There are two identifying elements,
``name`` and ``wavelength``. These can be used to generically refer to a
Dataset. The other elements of a DatasetID are meant to further
distinguish a Dataset from the possible variations it may have. For
example multiple Datasets may be called by one ``name`` but may exist
in multiple resolutions or with different calibrations such as "radiance"
and "reflectance". If an element is `None` then it is considered not
applicable.
A DatasetID can also be used in Satpy to query for a Dataset. This way
a fully qualified DatasetID can be found even if some of the DatasetID
elements are unknown. In this case a `None` signifies something that is
unknown or not applicable to the requested Dataset.
Args:
name (str): String identifier for the Dataset
wavelength (float, tuple): Single float wavelength when querying for
a Dataset. Otherwise 3-element tuple of
floats specifying the minimum, nominal,
and maximum wavelength for a Dataset.
`None` if not applicable.
resolution (int, float): Per data pixel/area resolution. If resolution
varies across the Dataset then nadir view
resolution is preferred. Usually this is in
meters, but for lon/lat gridded data angle
degrees may be used.
polarization (str): 'V' or 'H' polarizations of a microwave channel.
`None` if not applicable.
calibration (str): String identifying the calibration level of the
Dataset (ex. 'radiance', 'reflectance', etc).
`None` if not applicable.
level (int, float): Pressure/altitude level of the dataset. This is
typically in hPa, but may be in inverse meters
for altitude datasets (1/meters).
modifiers (tuple): Tuple of strings identifying what corrections or
other modifications have been performed on this
Dataset (ex. 'sunz_corrected', 'rayleigh_corrected',
etc). `None` or empty tuple if not applicable.
"""
def __new__(cls, *args, **kwargs):
"""Create new DatasetID."""
ret = super(DatasetID, cls).__new__(cls, *args, **kwargs)
if ret.modifiers is not None and not isinstance(ret.modifiers, tuple):
raise TypeError("'DatasetID' modifiers must be a tuple or None, "
"not {}".format(type(ret.modifiers)))
return ret
[docs] @staticmethod
def name_match(a, b):
"""Return if two string names are equal.
Args:
a (str): DatasetID.name or other string
b (str): DatasetID.name or other string
"""
return a == b
[docs] @staticmethod
def wavelength_match(a, b):
"""Return if two wavelengths are equal.
Args:
a (tuple or scalar): (min wl, nominal wl, max wl) or scalar wl
b (tuple or scalar): (min wl, nominal wl, max wl) or scalar wl
"""
if type(a) == (type(b) or
isinstance(a, numbers.Number) and
isinstance(b, numbers.Number)):
return a == b
elif a is None or b is None:
return False
elif isinstance(a, (list, tuple)) and len(a) == 3:
return a[0] <= b <= a[2]
elif isinstance(b, (list, tuple)) and len(b) == 3:
return b[0] <= a <= b[2]
else:
raise ValueError("Can only compare wavelengths of length 1 or 3")
def _comparable(self):
"""Get a comparable version of the DatasetID.
Without this DatasetIDs often raise an exception when compared in
Python 3 due to None not being comparable with other types.
"""
return self._replace(
name='' if self.name is None else self.name,
wavelength=tuple() if self.wavelength is None else self.wavelength,
resolution=0 if self.resolution is None else self.resolution,
polarization='' if self.polarization is None else self.polarization,
calibration='' if self.calibration is None else self.calibration,
)
def __lt__(self, other):
"""Less than."""
"""Compare DatasetIDs with special handling of `None` values"""
# modifiers should never be None when sorted, should be tuples
if isinstance(other, DatasetID):
other = other._comparable()
return super(DatasetID, self._comparable()).__lt__(other)
def __eq__(self, other):
"""Check for equality."""
if isinstance(other, str):
return self.name_match(self.name, other)
elif isinstance(other, numbers.Number) or \
isinstance(other, (tuple, list)) and len(other) == 3:
return self.wavelength_match(self.wavelength, other)
else:
return super(DatasetID, self).__eq__(other)
def __hash__(self):
"""Generate the hash of the ID."""
return tuple.__hash__(self)
[docs] @classmethod
def from_dict(cls, d, **kwargs):
"""Convert a dict to an ID."""
args = []
for k in DATASET_KEYS:
val = kwargs.get(k, d.get(k))
# force modifiers to tuple
if k == 'modifiers' and val is not None:
val = tuple(val)
args.append(val)
return cls(*args)
[docs] def to_dict(self, trim=True):
"""Convert the ID to a dict."""
if trim:
return self._to_trimmed_dict()
else:
return dict(zip(DATASET_KEYS, self))
def _to_trimmed_dict(self):
return {key: getattr(self, key) for key in DATASET_KEYS
if getattr(self, key) is not None}
[docs]def create_filtered_dsid(dataset_key, **dfilter):
"""Create a DatasetID matching *dataset_key* and *dfilter*.
If a proprety is specified in both *dataset_key* and *dfilter*, the former
has priority.
"""
try:
ds_dict = dataset_key.to_dict()
except AttributeError:
if isinstance(dataset_key, str):
ds_dict = {'name': dataset_key}
elif isinstance(dataset_key, numbers.Number):
ds_dict = {'wavelength': dataset_key}
for key, value in dfilter.items():
if value is not None:
ds_dict.setdefault(key, value)
return DatasetID.from_dict(ds_dict)
[docs]def dataset_walker(datasets):
"""Walk through *datasets* and their ancillary data.
Yields datasets and their parent.
"""
for dataset in datasets:
yield dataset, None
for anc_ds in dataset.attrs.get('ancillary_variables', []):
try:
anc_ds.attrs
yield anc_ds, dataset
except AttributeError:
continue
[docs]def replace_anc(dataset, parent_dataset):
"""Replace *dataset* the *parent_dataset*'s `ancillary_variables` field."""
if parent_dataset is None:
return
current_dsid = DatasetID.from_dict(dataset.attrs)
for idx, ds in enumerate(parent_dataset.attrs['ancillary_variables']):
if current_dsid == DatasetID.from_dict(ds.attrs):
parent_dataset.attrs['ancillary_variables'][idx] = dataset
return
[docs]class Dataset(object):
"""Placeholder for the deprecated class."""
pass