#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""SEVIRI native format reader.
References:
MSG Level 1.5 Native Format File Definition
https://www.eumetsat.int/website/wcm/idc/idcplg?IdcService=GET_FILE&dDocName=PDF_FG15_MSG-NATIVE-FORMAT-15&RevisionSelectionMethod=LatestReleased&Rendition=Web
MSG Level 1.5 Image Data Format Description
https://www.eumetsat.int/website/wcm/idc/idcplg?IdcService=GET_FILE&dDocName=PDF_TEN_05105_MSG_IMG_DATA&RevisionSelectionMethod=LatestReleased&Rendition=Web
"""
import logging
from datetime import datetime
import numpy as np
import xarray as xr
import dask.array as da
from satpy import CHUNK_SIZE
from pyresample import geometry
from satpy.readers.file_handlers import BaseFileHandler
from satpy.readers.eum_base import recarray2dict
from satpy.readers.seviri_base import (SEVIRICalibrationHandler,
CHANNEL_NAMES, CALIB, SATNUM,
dec10216, VISIR_NUM_COLUMNS,
VISIR_NUM_LINES, HRV_NUM_COLUMNS,
VIS_CHANNELS)
from satpy.readers.seviri_l1b_native_hdr import (GSDTRecords, native_header,
native_trailer)
from satpy.readers._geos_area import get_area_definition
logger = logging.getLogger('native_msg')
[docs]class NativeMSGFileHandler(BaseFileHandler, SEVIRICalibrationHandler):
"""SEVIRI native format reader.
The Level1.5 Image data calibration method can be changed by adding the
required mode to the Scene object instantiation kwargs eg
kwargs = {"calib_mode": "gsics",}
"""
def __init__(self, filename, filename_info, filetype_info, calib_mode='nominal'):
"""Initialize the reader."""
super(NativeMSGFileHandler, self).__init__(filename,
filename_info,
filetype_info)
self.platform_name = None
self.calib_mode = calib_mode
# Declare required variables.
# Assume a full disk file, reset in _read_header if otherwise.
self.header = {}
self.mda = {}
self.mda['is_full_disk'] = True
self.trailer = {}
# Read header, prepare dask-array, read trailer
# Available channels are known only after the header has been read
self._read_header()
self.dask_array = da.from_array(self._get_memmap(), chunks=(CHUNK_SIZE,))
self._read_trailer()
@property
def start_time(self):
"""Read the repeat cycle start time from metadata."""
return self.header['15_DATA_HEADER']['ImageAcquisition'][
'PlannedAcquisitionTime']['TrueRepeatCycleStart']
@property
def end_time(self):
"""Read the repeat cycle end time from metadata."""
return self.header['15_DATA_HEADER']['ImageAcquisition'][
'PlannedAcquisitionTime']['PlannedRepeatCycleEnd']
@staticmethod
def _calculate_area_extent(center_point, north, east, south, west,
we_offset, ns_offset, column_step, line_step):
# For Earth model 2 and full disk VISIR, (center_point - west - 0.5 + we_offset) must be -1856.5 .
# See MSG Level 1.5 Image Data Format Description Figure 7 - Alignment and numbering of the non-HRV pixels.
ll_c = (center_point - east + 0.5 + we_offset) * column_step
ll_l = (north - center_point + 0.5 + ns_offset) * line_step
ur_c = (center_point - west - 0.5 + we_offset) * column_step
ur_l = (south - center_point - 0.5 + ns_offset) * line_step
return (ll_c, ll_l, ur_c, ur_l)
def _get_data_dtype(self):
"""Get the dtype of the file based on the actual available channels."""
pkhrec = [
('GP_PK_HEADER', GSDTRecords.gp_pk_header),
('GP_PK_SH1', GSDTRecords.gp_pk_sh1)
]
pk_head_dtype = np.dtype(pkhrec)
def get_lrec(cols):
lrec = [
("gp_pk", pk_head_dtype),
("version", np.uint8),
("satid", np.uint16),
("time", (np.uint16, 5)),
("lineno", np.uint32),
("chan_id", np.uint8),
("acq_time", (np.uint16, 3)),
("line_validity", np.uint8),
("line_rquality", np.uint8),
("line_gquality", np.uint8),
("line_data", (np.uint8, cols))
]
return lrec
# each pixel is 10-bits -> one line of data has 25% more bytes
# than the number of columns suggest (10/8 = 1.25)
visir_rec = get_lrec(int(self.mda['number_of_columns'] * 1.25))
number_of_visir_channels = len(
[s for s in self.mda['channel_list'] if not s == 'HRV'])
drec = [('visir', (visir_rec, number_of_visir_channels))]
if self.mda['available_channels']['HRV']:
hrv_rec = get_lrec(int(self.mda['hrv_number_of_columns'] * 1.25))
drec.append(('hrv', (hrv_rec, 3)))
return np.dtype(drec)
def _get_memmap(self):
"""Get the memory map for the SEVIRI data."""
with open(self.filename) as fp:
data_dtype = self._get_data_dtype()
hdr_size = native_header.itemsize
return np.memmap(fp, dtype=data_dtype,
shape=(self.mda['number_of_lines'],),
offset=hdr_size, mode="r")
def _read_header(self):
"""Read the header info."""
data = np.fromfile(self.filename,
dtype=native_header, count=1)
self.header.update(recarray2dict(data))
data15hd = self.header['15_DATA_HEADER']
sec15hd = self.header['15_SECONDARY_PRODUCT_HEADER']
# Set the list of available channels:
self.mda['available_channels'] = get_available_channels(self.header)
self.mda['channel_list'] = [i for i in CHANNEL_NAMES.values()
if self.mda['available_channels'][i]]
self.platform_id = data15hd[
'SatelliteStatus']['SatelliteDefinition']['SatelliteId']
self.mda['platform_name'] = "Meteosat-" + SATNUM[self.platform_id]
equator_radius = data15hd['GeometricProcessing'][
'EarthModel']['EquatorialRadius'] * 1000.
north_polar_radius = data15hd[
'GeometricProcessing']['EarthModel']['NorthPolarRadius'] * 1000.
south_polar_radius = data15hd[
'GeometricProcessing']['EarthModel']['SouthPolarRadius'] * 1000.
polar_radius = (north_polar_radius + south_polar_radius) * 0.5
ssp_lon = data15hd['ImageDescription'][
'ProjectionDescription']['LongitudeOfSSP']
self.mda['projection_parameters'] = {'a': equator_radius,
'b': polar_radius,
'h': 35785831.00,
'ssp_longitude': ssp_lon}
north = int(sec15hd['NorthLineSelectedRectangle']['Value'])
east = int(sec15hd['EastColumnSelectedRectangle']['Value'])
south = int(sec15hd['SouthLineSelectedRectangle']['Value'])
west = int(sec15hd['WestColumnSelectedRectangle']['Value'])
ncolumns = west - east + 1
nrows = north - south + 1
# check if the file has less rows or columns than
# the maximum, if so it is an area of interest file
if (nrows < VISIR_NUM_LINES) or (ncolumns < VISIR_NUM_COLUMNS):
self.mda['is_full_disk'] = False
# If the number of columns in the file is not divisible by 4,
# UMARF will add extra columns to the file
modulo = ncolumns % 4
padding = 0
if modulo > 0:
padding = 4 - modulo
cols_visir = ncolumns + padding
# Check the VISIR calculated column dimension against
# the header information
cols_visir_hdr = int(sec15hd['NumberColumnsVISIR']['Value'])
if cols_visir_hdr != cols_visir:
logger.warning(
"Number of VISIR columns from the header is incorrect!")
logger.warning("Header: %d", cols_visir_hdr)
logger.warning("Calculated: = %d", cols_visir)
# HRV Channel - check if the area is reduced in east west
# direction as this affects the number of columns in the file
cols_hrv_hdr = int(sec15hd['NumberColumnsHRV']['Value'])
if ncolumns < VISIR_NUM_COLUMNS:
cols_hrv = cols_hrv_hdr
else:
cols_hrv = int(cols_hrv_hdr / 2)
# self.mda represents the 16bit dimensions not 10bit
self.mda['number_of_lines'] = int(sec15hd['NumberLinesVISIR']['Value'])
self.mda['number_of_columns'] = cols_visir
self.mda['hrv_number_of_lines'] = int(sec15hd["NumberLinesHRV"]['Value'])
self.mda['hrv_number_of_columns'] = cols_hrv
def _read_trailer(self):
hdr_size = native_header.itemsize
data_size = (self._get_data_dtype().itemsize *
self.mda['number_of_lines'])
with open(self.filename) as fp:
fp.seek(hdr_size + data_size)
data = np.fromfile(fp, dtype=native_trailer, count=1)
self.trailer.update(recarray2dict(data))
[docs] def get_area_def(self, dataset_id):
"""Get the area definition of the band."""
pdict = {}
pdict['a'] = self.mda['projection_parameters']['a']
pdict['b'] = self.mda['projection_parameters']['b']
pdict['h'] = self.mda['projection_parameters']['h']
pdict['ssp_lon'] = self.mda['projection_parameters']['ssp_longitude']
if dataset_id.name == 'HRV':
pdict['nlines'] = self.mda['hrv_number_of_lines']
pdict['ncols'] = self.mda['hrv_number_of_columns']
pdict['a_name'] = 'geos_seviri_hrv'
pdict['a_desc'] = 'SEVIRI high resolution channel area'
pdict['p_id'] = 'seviri_hrv'
if self.mda['is_full_disk']:
# handle full disk HRV data with two separated area definitions
[upper_area_extent, lower_area_extent,
upper_nlines, upper_ncols, lower_nlines, lower_ncols] = self.get_area_extent(dataset_id)
# upper area
pdict['a_desc'] = 'SEVIRI high resolution channel, upper window'
pdict['nlines'] = upper_nlines
pdict['ncols'] = upper_ncols
upper_area = get_area_definition(pdict, upper_area_extent)
# lower area
pdict['a_desc'] = 'SEVIRI high resolution channel, lower window'
pdict['nlines'] = lower_nlines
pdict['ncols'] = lower_ncols
lower_area = get_area_definition(pdict, lower_area_extent)
area = geometry.StackedAreaDefinition(lower_area, upper_area)
area = area.squeeze()
else:
# if the HRV data is in a ROI, the HRV channel is delivered in one area
area = get_area_definition(pdict, self.get_area_extent(dataset_id))
else:
pdict['nlines'] = self.mda['number_of_lines']
pdict['ncols'] = self.mda['number_of_columns']
pdict['a_name'] = 'geos_seviri_visir'
pdict['a_desc'] = 'SEVIRI low resolution channel area'
pdict['p_id'] = 'seviri_visir'
area = get_area_definition(pdict, self.get_area_extent(dataset_id))
return area
[docs] def get_area_extent(self, dataset_id):
"""Get the area extent of the file.
Until December 2017, the data is shifted by 1.5km SSP North and West against the nominal GEOS projection. Since
December 2017 this offset has been corrected. A flag in the data indicates if the correction has been applied.
If no correction was applied, adjust the area extent to match the shifted data.
For more information see Section 3.1.4.2 in the MSG Level 1.5 Image Data Format Description. The correction
of the area extent is documented in a `developer's memo <https://github.com/pytroll/satpy/wiki/
SEVIRI-georeferencing-offset-correction>`_.
"""
data15hd = self.header['15_DATA_HEADER']
sec15hd = self.header['15_SECONDARY_PRODUCT_HEADER']
# check for Earth model as this affects the north-south and
# west-east offsets
# section 3.1.4.2 of MSG Level 1.5 Image Data Format Description
earth_model = data15hd['GeometricProcessing']['EarthModel'][
'TypeOfEarthModel']
if earth_model == 2:
ns_offset = 0
we_offset = 0
elif earth_model == 1:
ns_offset = -0.5
we_offset = 0.5
if dataset_id.name == 'HRV':
ns_offset = -1.5
we_offset = 1.5
else:
raise NotImplementedError(
'Unrecognised Earth model: {}'.format(earth_model)
)
if dataset_id.name == 'HRV':
grid_origin = data15hd['ImageDescription']['ReferenceGridHRV']['GridOrigin']
center_point = (HRV_NUM_COLUMNS / 2) - 2
coeff = 3
column_step = data15hd['ImageDescription']['ReferenceGridHRV']['ColumnDirGridStep'] * 1000.0
line_step = data15hd['ImageDescription']['ReferenceGridHRV']['LineDirGridStep'] * 1000.0
else:
grid_origin = data15hd['ImageDescription']['ReferenceGridVIS_IR']['GridOrigin']
center_point = VISIR_NUM_COLUMNS / 2
coeff = 1
column_step = data15hd['ImageDescription']['ReferenceGridVIS_IR']['ColumnDirGridStep'] * 1000.0
line_step = data15hd['ImageDescription']['ReferenceGridVIS_IR']['LineDirGridStep'] * 1000.0
# Calculations assume grid origin is south-east corner
# section 7.2.4 of MSG Level 1.5 Image Data Format Description
origins = {0: 'NW', 1: 'SW', 2: 'SE', 3: 'NE'}
if grid_origin != 2:
msg = 'Grid origin not supported number: {}, {} corner'.format(
grid_origin, origins[grid_origin]
)
raise NotImplementedError(msg)
# When dealing with HRV channel and full disk, area extent is
# in two pieces
if (dataset_id.name == 'HRV') and self.mda['is_full_disk']:
# get actual navigation parameters from trailer data
data15tr = self.trailer['15TRAILER']
HRV_bounds = data15tr['ImageProductionStats']['ActualL15CoverageHRV']
# upper window
upper_north_line = HRV_bounds['UpperNorthLineActual']
upper_west_column = HRV_bounds['UpperWestColumnActual']
upper_south_line = HRV_bounds['UpperSouthLineActual']
upper_east_column = HRV_bounds['UpperEastColumnActual']
upper_area_extent = self._calculate_area_extent(
center_point, upper_north_line, upper_east_column,
upper_south_line, upper_west_column, we_offset,
ns_offset, column_step, line_step
)
upper_nlines = upper_north_line - upper_south_line + 1
upper_ncols = upper_west_column - upper_east_column + 1
# lower window
lower_north_line = HRV_bounds['LowerNorthLineActual']
lower_west_column = HRV_bounds['LowerWestColumnActual']
lower_south_line = HRV_bounds['LowerSouthLineActual']
lower_east_column = HRV_bounds['LowerEastColumnActual']
lower_area_extent = self._calculate_area_extent(
center_point, lower_north_line, lower_east_column,
lower_south_line, lower_west_column, we_offset,
ns_offset, column_step, line_step
)
lower_nlines = lower_north_line - lower_south_line + 1
lower_ncols = lower_west_column - lower_east_column + 1
return [upper_area_extent, lower_area_extent, upper_nlines, upper_ncols, lower_nlines, lower_ncols]
# Otherwise area extent is in one piece, corner points are
# the same as for VISIR channels, HRV channel is having
# three times the amount of columns and rows
else:
north = coeff * int(sec15hd['NorthLineSelectedRectangle']['Value'])
east = coeff * int(sec15hd['EastColumnSelectedRectangle']['Value'])
west = coeff * int(sec15hd['WestColumnSelectedRectangle']['Value'])
south = coeff * int(sec15hd['SouthLineSelectedRectangle']['Value'])
area_extent = self._calculate_area_extent(
center_point, north, east,
south, west, we_offset,
ns_offset, column_step, line_step
)
return area_extent
[docs] def get_dataset(self, dataset_id, dataset_info):
"""Get the dataset."""
if dataset_id.name not in self.mda['channel_list']:
raise KeyError('Channel % s not available in the file' % dataset_id.name)
elif dataset_id.name not in ['HRV']:
shape = (self.mda['number_of_lines'], self.mda['number_of_columns'])
# Check if there is only 1 channel in the list as a change
# is needed in the arrray assignment ie channl id is not present
if len(self.mda['channel_list']) == 1:
raw = self.dask_array['visir']['line_data']
else:
i = self.mda['channel_list'].index(dataset_id.name)
raw = self.dask_array['visir']['line_data'][:, i, :]
data = dec10216(raw.flatten())
data = data.reshape(shape)
else:
shape = (self.mda['hrv_number_of_lines'], self.mda['hrv_number_of_columns'])
raw2 = self.dask_array['hrv']['line_data'][:, 2, :]
raw1 = self.dask_array['hrv']['line_data'][:, 1, :]
raw0 = self.dask_array['hrv']['line_data'][:, 0, :]
shape_layer = (self.mda['number_of_lines'], self.mda['hrv_number_of_columns'])
data2 = dec10216(raw2.flatten())
data2 = data2.reshape(shape_layer)
data1 = dec10216(raw1.flatten())
data1 = data1.reshape(shape_layer)
data0 = dec10216(raw0.flatten())
data0 = data0.reshape(shape_layer)
data = np.zeros(shape)
idx = range(0, shape[0], 3)
data[idx, :] = data0
idx = range(1, shape[0], 3)
data[idx, :] = data1
idx = range(2, shape[0], 3)
data[idx, :] = data2
xarr = xr.DataArray(data, dims=['y', 'x']).where(data != 0).astype(np.float32)
if xarr is None:
dataset = None
else:
dataset = self.calibrate(xarr, dataset_id)
dataset.attrs['units'] = dataset_info['units']
dataset.attrs['wavelength'] = dataset_info['wavelength']
dataset.attrs['standard_name'] = dataset_info['standard_name']
dataset.attrs['platform_name'] = self.mda['platform_name']
dataset.attrs['sensor'] = 'seviri'
dataset.attrs['orbital_parameters'] = {
'projection_longitude': self.mda['projection_parameters']['ssp_longitude'],
'projection_latitude': 0.,
'projection_altitude': self.mda['projection_parameters']['h']}
return dataset
[docs] def calibrate(self, data, dataset_id):
"""Calibrate the data."""
tic = datetime.now()
data15hdr = self.header['15_DATA_HEADER']
calibration = dataset_id.calibration
channel = dataset_id.name
# even though all the channels may not be present in the file,
# the header does have calibration coefficients for all the channels
# hence, this channel index needs to refer to full channel list
i = list(CHANNEL_NAMES.values()).index(channel)
if calibration == 'counts':
return data
if calibration in ['radiance', 'reflectance', 'brightness_temperature']:
# determine the required calibration coefficients to use
# for the Level 1.5 Header
if (self.calib_mode.upper() != 'GSICS' and self.calib_mode.upper() != 'NOMINAL'):
raise NotImplementedError(
'Unknown Calibration mode : Please check')
# NB GSICS doesn't have calibration coeffs for VIS channels
if (self.calib_mode.upper() != 'GSICS' or channel in VIS_CHANNELS):
coeffs = data15hdr[
'RadiometricProcessing']['Level15ImageCalibration']
gain = coeffs['CalSlope'][i]
offset = coeffs['CalOffset'][i]
else:
coeffs = data15hdr[
'RadiometricProcessing']['MPEFCalFeedback']
gain = coeffs['GSICSCalCoeff'][i]
offset = coeffs['GSICSOffsetCount'][i]
offset = offset * gain
res = self._convert_to_radiance(data, gain, offset)
if calibration == 'reflectance':
solar_irradiance = CALIB[self.platform_id][channel]["F"]
res = self._vis_calibrate(res, solar_irradiance)
elif calibration == 'brightness_temperature':
cal_type = data15hdr['ImageDescription'][
'Level15ImageProduction']['PlannedChanProcessing'][i]
res = self._ir_calibrate(res, channel, cal_type)
logger.debug("Calibration time " + str(datetime.now() - tic))
return res
[docs]def get_available_channels(header):
"""Get the available channels from the header information."""
chlist_str = header['15_SECONDARY_PRODUCT_HEADER'][
'SelectedBandIDs']['Value']
retv = {}
for idx, char in zip(range(12), chlist_str):
retv[CHANNEL_NAMES[idx + 1]] = (char == 'X')
return retv