#!/usr/bin/env python
# coding: utf-8
# SICOR is a freely available, platform-independent software designed to process hyperspectral remote sensing data,
# and particularly developed to handle data from the EnMAP sensor.
# This file contains the Sensor Independent Atmospheric CORrection (SICOR) AC module - EnMAP specific parts.
# Copyright (C) 2018 Niklas Bohn (GFZ, <nbohn@gfz-potsdam.de>),
# German Research Centre for Geosciences (GFZ, <https://www.gfz-potsdam.de>)
# This software was developed within the context of the EnMAP project supported by the DLR Space Administration with
# funds of the German Federal Ministry of Economic Affairs and Energy (on the basis of a decision by the German
# Bundestag: 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>.
import logging
import numpy as np
from scipy.ndimage import gaussian_filter
from tqdm import tqdm
import warnings
from sicor.AC.RtFo_3_phases import Fo, FoFunc, __minimize__
[docs]
def make_ac_enmap(data, enmap_l1b, fo, cwv, pt, surf_res, logger=None):
"""
Perform atmospheric correction for enmap_l1b product, based on given and retrieved parameters. Instead of returning
an object, the function adds a 'data_l2a' attribute to each detector. This numpy array holds the retrieved surface
reflectance map.
:param data: array containing measured TOA radiance of VNIR and SWIR
:param enmap_l1b: EnMAP Level-1B object
:param fo: forward operator object
:param cwv: CWV retrieval maps for VNIR and SWIR
:param pt: forward model parameter vector for VNIR and SWIR
:param surf_res: dictionary of retrieved surface parameters ('intercept', 'slope', 'liquid water', 'ice')
:param logger: None or logging instance
:return: None
"""
logger = logger or logging.getLogger(__name__)
for detector_name in enmap_l1b.detector_attrNames:
logger.info("Calculating surface reflectance for %s detector..." % detector_name)
detector = getattr(enmap_l1b, detector_name)
detector.data_l2a = np.full(data[detector_name].shape, np.NaN, dtype=float)
for icol in tqdm(range(data[detector_name].shape[1]), disable=fo.disable_progressbars):
for irow in range(data[detector_name].shape[0]):
detector.data_l2a[irow, icol, :] = fo.surf_ref(dt=data[detector_name][irow, icol, :],
xx=cwv[detector_name][irow, icol],
pt=pt[detector_name][irow, icol, :],
mode=detector_name)
if detector_name == "swir" and not fo.segmentation:
xx = np.array([cwv[detector_name][irow, icol], surf_res["intercept"][irow, icol],
surf_res["slope"][irow, icol], surf_res["liquid"][irow, icol],
surf_res["ice"][irow, icol]])
swir_feature = fo.surface_model(xx=xx)
detector.data_l2a[irow, icol, fo.fit_wvl] = swir_feature
[docs]
def sicor_ac_enmap(enmap_l1b, options, unknowns=False, logger=None):
"""
Atmospheric correction for EnMAP Level-1B products, including a three phases of water retrieval.
:param enmap_l1b: EnMAP Level-1B object
:param options: dictionary with EnMAP specific options
:param unknowns: if True, uncertainties due to unknown forward model parameters are added to S_epsilon;
default: False
:param logger: None or logging instance
:return: surface reflectance for EnMAP VNIR and SWIR detectors as well as dictionary containing estimated
three phases of water maps and several retrieval uncertainty measures
"""
logger = logger or logging.getLogger(__name__)
logger.info("Setting up forward operator...")
fo_enmap = Fo(enmap_l1b=enmap_l1b, options=options, logger=logger)
fo_func_enmap = FoFunc(fo=fo_enmap)
logger.info("Performing 3 phases of water retrieval...")
res = __minimize__(fo=fo_enmap, opt_func=fo_func_enmap, logger=logger, unknowns=unknowns)
if fo_enmap.segmentation:
logger.info("Smoothing segmented CWV retrieval map...")
cwv_seg = np.zeros(fo_enmap.data_swir.shape[:2])
if fo_enmap.land_only:
for ii, lbl in enumerate(fo_enmap.lbl):
cwv_seg[fo_enmap.labels == lbl] = res["cwv_model"][:, ii]
else:
for i in range(fo_enmap.segs):
cwv_seg[fo_enmap.labels == i] = res["cwv_model"][:, i]
# smoothing segmented CWV map
cwv_smoothed = gaussian_filter(cwv_seg, sigma=fo_enmap.smoothing_sigma)
logger.info("Transforming smoothed CWV retrieval map to VNIR sensor geometry to enable AC of VNIR data...")
cwv_trans = enmap_l1b.transform_swir_to_vnir_raster(array_swirsensorgeo=cwv_smoothed,
resamp_alg=fo_enmap.resamp_alg,
respect_keystone=False)
data_ac = {"vnir": fo_enmap.data_vnir, "swir": fo_enmap.data_swir}
cwv_ac = {"vnir": cwv_trans, "swir": cwv_smoothed}
pt_ac = {"vnir": fo_enmap.pt_vnir, "swir": fo_enmap.pt}
logger.info("Starting surface reflectance retrieval...")
warnings.filterwarnings("ignore")
make_ac_enmap(data=data_ac, enmap_l1b=enmap_l1b, fo=fo_enmap, cwv=cwv_ac, pt=pt_ac, surf_res=None,
logger=logger)
enmap_l2a_vnir = enmap_l1b.vnir.data_l2a
enmap_l2a_swir = enmap_l1b.swir.data_l2a
logger.info("Smoothing segmented liquid water and ice maps...")
liq_seg = np.zeros(fo_enmap.data_swir.shape[:2])
ice_seg = np.zeros(fo_enmap.data_swir.shape[:2])
if fo_enmap.land_only:
for ii, lbl in enumerate(fo_enmap.lbl):
liq_seg[fo_enmap.labels == lbl] = res["liq_model"][:, ii]
ice_seg[fo_enmap.labels == lbl] = res["ice_model"][:, ii]
else:
for i in range(fo_enmap.segs):
liq_seg[fo_enmap.labels == i] = res["liq_model"][:, i]
ice_seg[fo_enmap.labels == i] = res["ice_model"][:, i]
liq_smoothed = gaussian_filter(liq_seg, sigma=fo_enmap.smoothing_sigma)
ice_smoothed = gaussian_filter(ice_seg, sigma=fo_enmap.smoothing_sigma)
if fo_enmap.land_only:
enmap_l2a_vnir[fo_enmap.water_mask_vnir != 1] = np.nan
enmap_l2a_swir[fo_enmap.water_mask_swir != 1] = np.nan
cwv_smoothed[fo_enmap.water_mask_swir != 1] = np.nan
liq_smoothed[fo_enmap.water_mask_swir != 1] = np.nan
ice_smoothed[fo_enmap.water_mask_swir != 1] = np.nan
res["cwv_model"] = cwv_smoothed
res["liq_model"] = liq_smoothed
res["ice_model"] = ice_smoothed
else:
logger.info("Transforming CWV retrieval map to VNIR sensor geometry to enable AC of VNIR data...")
cwv_trans = enmap_l1b.transform_swir_to_vnir_raster(array_swirsensorgeo=res["cwv_model"],
resamp_alg=fo_enmap.resamp_alg,
respect_keystone=False)
data_ac = {"vnir": fo_enmap.data_vnir, "swir": fo_enmap.data_swir}
cwv_ac = {"vnir": cwv_trans, "swir": res["cwv_model"]}
pt_ac = {"vnir": fo_enmap.pt_vnir, "swir": fo_enmap.pt}
surf_ac = {"intercept": res["intercept_model"], "slope": res["slope_model"], "liquid": res["liq_model"],
"ice": res["ice_model"]}
logger.info("Starting surface reflectance retrieval...")
warnings.filterwarnings("ignore")
make_ac_enmap(data=data_ac, enmap_l1b=enmap_l1b, fo=fo_enmap, cwv=cwv_ac, pt=pt_ac, surf_res=surf_ac,
logger=logger)
enmap_l2a_vnir = enmap_l1b.vnir.data_l2a
enmap_l2a_swir = enmap_l1b.swir.data_l2a
# simple validation of L2A data
if fo_enmap.land_only:
val_vnir = enmap_l2a_vnir[fo_enmap.water_mask_vnir == 1]
val_swir = enmap_l2a_swir[fo_enmap.water_mask_swir == 1]
if np.isnan(val_vnir).any() or np.isnan(val_swir).any():
logger.warning("The surface reflectance for land only generated by SICOR contains NaN values. Please check "
"for errors in the input data, the options file, or the processing code.")
else:
if np.isnan(enmap_l2a_vnir).any() or np.isnan(enmap_l2a_swir).any():
logger.warning("The surface reflectance for land + water generated by SICOR contains NaN values. Please "
"check for errors in the input data, the options file, or the processing code.")
for ii, dl2a in zip(range(2), [enmap_l2a_vnir, enmap_l2a_swir]):
if dl2a[np.isfinite(dl2a)].shape[0] > 0:
if ii == 0:
d_name = "VNIR L2A"
else:
d_name = "SWIR L2A"
if np.min(dl2a[np.isfinite(dl2a)]) < 0:
logger.warning("%s data contain negative values indicating an overcorrection. Please check for "
"errors in the input data, the options file, or the processing code." % d_name)
if np.max(dl2a[np.isfinite(dl2a)]) > 1:
logger.warning("%s data contain values exceeding 1 indicating a saturation. Please check for errors "
"in the input data, the options file, or the processing code." % d_name)
del enmap_l1b.vnir.data_l2a
del enmap_l1b.swir.data_l2a
if fo_enmap.land_only:
mode = "combined"
else:
mode = "land"
logger.info("SICOR atmospheric correction for %s in %s mode successfully finished!" % (options["sensor"]["name"],
mode))
return enmap_l2a_vnir, enmap_l2a_swir, res