Source code for satpy.readers.hdfeos_base

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Base HDF-EOS reader."""

import re
import logging

from datetime import datetime
import xarray as xr
import numpy as np

from pyhdf.error import HDF4Error
from pyhdf.SD import SD

from satpy import CHUNK_SIZE
from satpy.readers.file_handlers import BaseFileHandler

logger = logging.getLogger(__name__)


[docs]def interpolate(clons, clats, csatz, src_resolution, dst_resolution): """Interpolate two parallel datasets jointly.""" from geotiepoints.modisinterpolator import modis_1km_to_250m, modis_1km_to_500m, modis_5km_to_1km interpolation_functions = { (5000, 1000): modis_5km_to_1km, (1000, 500): modis_1km_to_500m, (1000, 250): modis_1km_to_250m } try: interpolation_function = interpolation_functions[(src_resolution, dst_resolution)] except KeyError: error_message = "Interpolation from {}m to {}m not implemented".format( src_resolution, dst_resolution) raise NotImplementedError(error_message) logger.debug("Interpolating from {} to {}".format(src_resolution, dst_resolution)) return interpolation_function(clons, clats, csatz)
[docs]class HDFEOSBaseFileReader(BaseFileHandler): """Base file handler for HDF EOS data for both L1b and L2 products.""" def __init__(self, filename, filename_info, filetype_info): """Initialize the base reader.""" BaseFileHandler.__init__(self, filename, filename_info, filetype_info) try: self.sd = SD(self.filename) except HDF4Error as err: error_message = "Could not load data from file {}: {}".format(self.filename, err) raise ValueError(error_message) # Read metadata self.metadata = self.read_mda(self.sd.attributes()['CoreMetadata.0']) self.metadata.update(self.read_mda( self.sd.attributes()['StructMetadata.0']) ) self.metadata.update(self.read_mda( self.sd.attributes()['ArchiveMetadata.0']) )
[docs] @staticmethod def read_mda(attribute): """Read the EOS metadata.""" lines = attribute.split('\n') mda = {} current_dict = mda path = [] prev_line = None for line in lines: if not line: continue if line == 'END': break if prev_line: line = prev_line + line key, val = line.split('=') key = key.strip() val = val.strip() try: val = eval(val) except NameError: pass except SyntaxError: prev_line = line continue prev_line = None if key in ['GROUP', 'OBJECT']: new_dict = {} path.append(val) current_dict[val] = new_dict current_dict = new_dict elif key in ['END_GROUP', 'END_OBJECT']: if val != path[-1]: raise SyntaxError path = path[:-1] current_dict = mda for item in path: current_dict = current_dict[item] elif key in ['CLASS', 'NUM_VAL']: pass else: current_dict[key] = val return mda
@property def start_time(self): """Get the start time of the dataset.""" date = (self.metadata['INVENTORYMETADATA']['RANGEDATETIME']['RANGEBEGINNINGDATE']['VALUE'] + ' ' + self.metadata['INVENTORYMETADATA']['RANGEDATETIME']['RANGEBEGINNINGTIME']['VALUE']) return datetime.strptime(date, '%Y-%m-%d %H:%M:%S.%f') @property def end_time(self): """Get the end time of the dataset.""" date = (self.metadata['INVENTORYMETADATA']['RANGEDATETIME']['RANGEENDINGDATE']['VALUE'] + ' ' + self.metadata['INVENTORYMETADATA']['RANGEDATETIME']['RANGEENDINGTIME']['VALUE']) return datetime.strptime(date, '%Y-%m-%d %H:%M:%S.%f') def _read_dataset_in_file(self, dataset_name): if dataset_name not in self.sd.datasets(): error_message = "Dataset name {} not included in available datasets {}".format( dataset_name, self.sd.datasets() ) raise KeyError(error_message) dataset = self.sd.select(dataset_name) return dataset
[docs] def load_dataset(self, dataset_name): """Load the dataset from HDF EOS file.""" from satpy.readers.hdf4_utils import from_sds dataset = self._read_dataset_in_file(dataset_name) fill_value = dataset._FillValue dask_arr = from_sds(dataset, chunks=CHUNK_SIZE) dims = ('y', 'x') if dask_arr.ndim == 2 else None data = xr.DataArray(dask_arr, dims=dims, attrs=dataset.attributes()) # preserve integer data types if possible if np.issubdtype(data.dtype, np.integer): new_fill = fill_value else: new_fill = np.nan data.attrs.pop('_FillValue', None) good_mask = data != fill_value scale_factor = data.attrs.get('scale_factor') if scale_factor is not None: data = data * scale_factor data = data.where(good_mask, new_fill) return data
[docs]class HDFEOSGeoReader(HDFEOSBaseFileReader): """Handler for the geographical datasets.""" # list of geographical datasets handled by the georeader # mapping to the default variable name if not specified in YAML DATASET_NAMES = { 'longitude': 'Longitude', 'latitude': 'Latitude', 'satellite_azimuth_angle': ('SensorAzimuth', 'Sensor_Azimuth'), 'satellite_zenith_angle': ('SensorZenith', 'Sensor_Zenith'), 'solar_azimuth_angle': ('SolarAzimuth', 'SolarAzimuth'), 'solar_zenith_angle': ('SolarZenith', 'Solar_Zenith'), } def __init__(self, filename, filename_info, filetype_info): """Initialize the geographical reader.""" HDFEOSBaseFileReader.__init__(self, filename, filename_info, filetype_info) self.cache = {}
[docs] @staticmethod def read_geo_resolution(metadata): """Parse metadata to find the geolocation resolution. It is implemented as a staticmethod to match read_mda pattern. """ # level 1 files try: ds = metadata['INVENTORYMETADATA']['COLLECTIONDESCRIPTIONCLASS']['SHORTNAME']['VALUE'] if ds.endswith('D03'): return 1000 else: # 1km files have 5km geolocation usually return 5000 except KeyError: pass # data files probably have this level 2 files # this does not work for L1B 1KM data files because they are listed # as 1KM data but the geo data inside is at 5km try: latitude_dim = metadata['SwathStructure']['SWATH_1']['DimensionMap']['DimensionMap_2']['GeoDimension'] resolution_regex = re.compile(r'(?P<resolution>\d+)(km|KM)') resolution_match = resolution_regex.search(latitude_dim) return int(resolution_match.group('resolution')) * 1000 except (AttributeError, KeyError): pass raise RuntimeError("Could not determine resolution from file metadata")
@property def geo_resolution(self): """Resolution of the geographical data retrieved in the metadata.""" return self.read_geo_resolution(self.metadata) def _load_ds_by_name(self, ds_name): """Attempt loading using multiple common names.""" var_names = self.DATASET_NAMES[ds_name] if isinstance(var_names, (list, tuple)): try: return self.load_dataset(var_names[0]) except KeyError: return self.load_dataset(var_names[1]) return self.load_dataset(var_names)
[docs] def get_interpolated_dataset(self, name1, name2, resolution, sensor_zenith, offset=0): """Load and interpolate datasets.""" try: result1 = self.cache[(name1, resolution)] result2 = self.cache[(name2, resolution)] except KeyError: result1 = self._load_ds_by_name(name1) result2 = self._load_ds_by_name(name2) - offset result1, result2 = interpolate( result1, result2, sensor_zenith, self.geo_resolution, resolution ) self.cache[(name1, resolution)] = result1 self.cache[(name2, resolution)] = result2 + offset
[docs] def get_dataset(self, dataset_keys, dataset_info): """Get the geolocation dataset.""" # Name of the dataset as it appears in the HDF EOS file in_file_dataset_name = dataset_info.get('file_key') # Name of the dataset in the YAML file dataset_name = dataset_keys.name # Resolution asked resolution = dataset_keys.resolution if in_file_dataset_name is not None: # if the YAML was configured with a specific name use that data = self.load_dataset(in_file_dataset_name) else: # otherwise use the default name for this variable data = self._load_ds_by_name(dataset_name) if resolution != self.geo_resolution: if in_file_dataset_name is not None: # they specified a custom variable name but # we don't know how to interpolate this yet raise NotImplementedError( "Interpolation for variable '{}' is not " "configured".format(dataset_name)) # The data must be interpolated sensor_zenith = self._load_ds_by_name('satellite_zenith_angle') logger.debug("Loading %s", dataset_name) if dataset_name in ['longitude', 'latitude']: self.get_interpolated_dataset('longitude', 'latitude', resolution, sensor_zenith) elif dataset_name in ['satellite_azimuth_angle', 'satellite_zenith_angle']: # Sensor dataset names differs between L1b and L2 products self.get_interpolated_dataset('satellite_azimuth_angle', 'satellite_zenith_angle', resolution, sensor_zenith, offset=90) elif dataset_name in ['solar_azimuth_angle', 'solar_zenith_angle']: # Sensor dataset names differs between L1b and L2 products self.get_interpolated_dataset('solar_azimuth_angle', 'solar_zenith_angle', resolution, sensor_zenith, offset=90) data = self.cache[dataset_name, resolution] for key in ('standard_name', 'units'): if key in dataset_info: data.attrs[key] = dataset_info[key] return data