Source code for sicor.Tools.EnMAP.first_guess

#!/usr/bin/env python
# coding: utf-8

# SICOR is a freely available, platform-independent software designed to process hyperspectral remote sensing data,
# and particularly developed to handle data from the EnMAP sensor.

# This file contains some tools for the first guess calculation needed for optimal estimation.

# Copyright (C) 2018  Niklas Bohn (GFZ, <nbohn@gfz-potsdam.de>),
# German Research Centre for Geosciences (GFZ, <https://www.gfz-potsdam.de>)

# This software was developed within the context of the EnMAP project supported by the DLR Space Administration with
# funds of the German Federal Ministry of Economic Affairs and Energy (on the basis of a decision by the German
# Bundestag: 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.

# This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.

# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

# You should have received a copy of the GNU General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>.


import numpy as np
import dill

from sicor.Tools.EnMAP.LUT import read_lut_enmap_formatted, interpol_lut, get_data_file
from sicor.Tools.EnMAP.conversion import generate_filter
from sicor.Tools.EnMAP.metadata import varsol


[docs] def wv_band_ratio(data, water_msk, fn_table, sol_model, vza, sza, dem, aot, raa, intp_wvl, intp_fwhm, jday, month, idx): """ Band ratio water vapor retrieval. :param data: image dataset :param water_msk: water mask :param fn_table: path to radiative transfer LUT :param sol_model: dictionary containing solar irradiance model ('wvl', 'sol_irr') :param vza: viewing zenith angle :param sza: sun zenith angle :param dem: digital elevation model, same shape as data :param aot: aerosol optical thickness :param raa: relative azimuth angle :param intp_wvl: instrument wavelengths :param intp_fwhm: instrument fwhm :param jday: acquisition day :param month: acquisition month :param idx: indices of instrument channels, which should be used for retrieval (should be approx. 870, 900 and 940 nm) :return: water vapor image """ cnt_land = len(data[:, :, idx[1]].flatten()) num_bd = 2 toa_sub = np.zeros((cnt_land, num_bd)) toa_sub[:, 0] = data[:, :, idx[1]].flatten() toa_sub[:, 1] = data[:, :, idx[2]].flatten() cnt_land = len(toa_sub[:, 0]) luts, axes_x, axes_y, wvl, lut1, lut2, xnodes, nm_nodes, ndim, x_cell = read_lut_enmap_formatted(file_lut=fn_table) wvl_lut = wvl s_norm = generate_filter(wvl_m=wvl_lut, wvl=intp_wvl, wl_resol=intp_fwhm) lut2_shape = np.array(lut2.shape) lut2_shape[6] = len(intp_wvl) lut2_res = np.zeros(lut2_shape) lut1_res = lut1[:, :, :, :, :, :, :, 0] @ s_norm for ii in range(lut2.shape[-1]): lut2_res[:, :, :, :, :, :, :, ii] = lut2[:, :, :, :, :, :, :, ii] @ s_norm dsol = varsol(jday, month) dn2rad = dsol * dsol * 0.1 fac = 1 / dn2rad hsfs = [np.min(dem), np.max(dem)] cwvs = list(axes_x[1][4]) rhos = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] l_toa_lut = np.zeros((len(hsfs), len(cwvs), len(rhos), len(intp_wvl))) for ii, hsf in enumerate(hsfs): for jj, cwv in enumerate(cwvs): vtest = np.asarray([vza, sza, hsf, aot, raa, cwv]) f_int = interpol_lut(lut1=lut1_res, lut2=lut2_res, xnodes=xnodes, nm_nodes=nm_nodes, ndim=ndim, x_cell=x_cell, vtest=vtest, intp_wvl=intp_wvl) f_int_l0 = f_int[0, :] * 1.e+3 f_int_edir = f_int[1, :] * 1.e+3 f_int_edif = f_int[2, :] * 1.e+3 f_int_ss = f_int[3, :] f_int_ee = f_int_edir * np.cos(np.deg2rad(sza)) + f_int_edif for kk, rho in enumerate(rhos): l_toa = (f_int_l0 + f_int_ee * rho / np.pi / (1 - f_int_ss * rho)) * fac l_toa_lut[ii, jj, kk, :] = l_toa s_norm_sol = generate_filter(wvl_m=sol_model["wvl"], wvl=intp_wvl, wl_resol=intp_fwhm) solar_res = sol_model["sol_irr"] @ s_norm_sol rfl_1_img = data[:, :, idx[0]] / solar_res[idx[0]] rfl_2_img = data[:, :, idx[1]] / solar_res[idx[1]] rfl_3_img = rfl_1_img + (rfl_2_img - rfl_1_img) * (intp_wvl[idx[2]] - intp_wvl[idx[0]]) / ( intp_wvl[idx[1]] - intp_wvl[idx[0]]) if np.min(rfl_3_img) <= 0: for ii in range(rfl_3_img.shape[0]): for jj in range(rfl_3_img.shape[1]): if rfl_3_img[ii, jj] <= 0: rfl_3_img[ii, jj] = 0.001 rfl_sl_img = (rfl_2_img / rfl_3_img).astype(float) rfl_sl_gr = [np.min(rfl_sl_img[rfl_sl_img > 0.0]) * 0.9, np.max(rfl_sl_img) * 1.1] dim_sl = len(rfl_sl_gr) alog_rat_wv_lut = np.zeros((len(hsfs), len(cwvs), len(rhos), dim_sl)) rat_wv_lut = l_toa_lut[:, :, :, idx[2]] / l_toa_lut[:, :, :, idx[1]] alog_rat_wv_lut[:, :, :, 0] = np.log(rat_wv_lut * rfl_sl_gr[0]) alog_rat_wv_lut[:, :, :, 1] = np.log(rat_wv_lut * rfl_sl_gr[1]) cf_arr = np.zeros((3, len(hsfs), len(rhos), dim_sl)) for ii in range(len(hsfs)): for jj in range(len(rhos)): for kk in range(dim_sl): cf_arr[:, ii, jj, kk] = np.polyfit(x=alog_rat_wv_lut[ii, :, jj, kk], y=cwvs, deg=2) alog_rat_wv_img = np.log(toa_sub[:, 1] / toa_sub[:, 0] * rfl_sl_img.flatten()) if hsfs[1] != hsfs[0]: dem_fac = (dem - hsfs[0]) / (hsfs[1] - hsfs[0]) dem_fac = dem_fac.flatten() else: dem_fac = np.zeros(cnt_land) s0 = solar_res[idx[1]] ll_cal = np.pi * data[:, :, idx[1]].flatten() / (s0 * np.cos(np.deg2rad(sza))) # COMPUTE WATER VAPOUR # # ====================== # handle cases where LUT range is exceeded by the spectrum (use lowest/highest rho) ll_cal[ll_cal < rhos[0]] = rhos[0] + 1e-5 # (1, 1024000) # + 1e-10 avoids zero-division at rfl_fac below ll_cal[ll_cal > rhos[-1]] = rhos[-1] - 1e-5 # (1, 1024000) # - 1e-10 avoids zero-division at rfl_fac below ll_cal_low = ll_cal[None, :] >= np.array(rhos)[:, None] # (11, 1024000) ll_cal_high = ll_cal[None, :] <= np.array(rhos)[:, None] # (11, 1024000) # get index of last "True" of ll_cal_low_full columns idx_low = len(rhos) - np.argmax(ll_cal_low[::-1], axis=0) - 1 # (1, 1024000) # get index of first "True" of ll_cal_high_full columns idx_high = np.argmax(ll_cal_high, axis=0) # (1, 1024000) # get corresponding rhos rhos_at_idx_low = np.array(rhos)[None, :][:, idx_low] # (1, 1024000) rhos_at_idx_high = np.array(rhos)[None, :][:, idx_high] # (1, 1024000) rfl_fac = ((ll_cal - rhos_at_idx_low) / (rhos_at_idx_high - rhos_at_idx_low)) # (1, 1024000) rfl_sl = ((rfl_sl_img.flatten() - rfl_sl_gr[0]) / (rfl_sl_gr[1] - rfl_sl_gr[0])) # (1, 1024000) cf_int = ( (1 - dem_fac) * (1 - rfl_fac) * (1 - rfl_sl) * cf_arr[:, 0, :, 0][:, idx_low] + (1 - dem_fac) * (1 - rfl_fac) * rfl_sl * cf_arr[:, 0, :, 1][:, idx_low] + (1 - dem_fac) * rfl_fac * (1 - rfl_sl) * cf_arr[:, 0, :, 0][:, idx_high] + (1 - dem_fac) * rfl_fac * rfl_sl * cf_arr[:, 0, :, 1][:, idx_high] + dem_fac * (1 - rfl_fac) * (1 - rfl_sl) * cf_arr[:, 1, :, 0][:, idx_low] + dem_fac * (1 - rfl_fac) * rfl_sl * cf_arr[:, 1, :, 1][:, idx_low] + dem_fac * rfl_fac * (1 - rfl_sl) * cf_arr[:, 1, :, 0][:, idx_high] + dem_fac * rfl_fac * rfl_sl * cf_arr[:, 1, :, 1][:, idx_high] ) # (3, 1024000) wv_flat = ( cf_int[2, :] + alog_rat_wv_img * cf_int[1, :] + alog_rat_wv_img * alog_rat_wv_img * cf_int[0, :] ) # (1, 1024000) wv_arr_img = wv_flat.reshape(data.shape[:2]) # (1024, 1000) # set water pixels to 0 wv_arr_img[water_msk != 1] = 0 return wv_arr_img
[docs] def wv_band_ratio_snow(data, fn_table, vza, sza, dem, aot, raa, intp_wvl, intp_fwhm, jday, month, idx): """ Band ratio water vapor retrieval, adapted to the estimation of snow and glacier ice surface properties. :param data: image dataset :param fn_table: path to radiative transfer LUT :param vza: viewing zenith angle :param sza: sun zenith angle :param dem: digital elevation model, same shape as data :param aot: aerosol optical thickness :param raa: relative azimuth angle :param intp_wvl: instrument wavelengths :param intp_fwhm: instrument fwhm :param jday: acquisition day :param month: acquisition month :param idx: indices of instrument channels, which should be used for retrieval (should be approx. 870, 900 and 940 nm) :return: water vapor image """ cnt_land = len(data[:, :, idx[1]].flatten()) num_bd = 2 toa_sub = np.zeros((cnt_land, num_bd)) toa_sub[:, 0] = data[:, :, idx[1]].flatten() toa_sub[:, 1] = data[:, :, idx[2]].flatten() cnt_land = len(toa_sub[:, 0]) luts, axes_x, axes_y, wvl, lut1, lut2, xnodes, nm_nodes, ndim, x_cell = read_lut_enmap_formatted(file_lut=fn_table) wvl_lut = wvl s_norm = generate_filter(wvl_m=wvl_lut, wvl=intp_wvl, wl_resol=intp_fwhm) lut2_shape = np.array(lut2.shape) lut2_shape[6] = len(intp_wvl) lut2_res = np.zeros(lut2_shape) lut1_res = lut1[:, :, :, :, :, :, :, 0] @ s_norm for ii in range(lut2.shape[-1]): lut2_res[:, :, :, :, :, :, :, ii] = lut2[:, :, :, :, :, :, :, ii] @ s_norm dsol = varsol(jday, month) dn2rad = dsol * dsol * 0.1 fac = 1 / dn2rad hsfs = [np.min(dem), np.max(dem)] cwvs = list(axes_x[1][4]) rhos = [0.02, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] l_toa_lut = np.zeros((len(hsfs), len(cwvs), len(rhos), len(intp_wvl))) for ii, hsf in enumerate(hsfs): for jj, cwv in enumerate(cwvs): vtest = np.asarray([vza, sza, hsf, aot, raa, cwv]) f_int = interpol_lut(lut1_res, lut2_res, xnodes, nm_nodes, ndim, x_cell, vtest, intp_wvl) f_int_l0 = f_int[0, :] * 1.e+3 f_int_edir = f_int[1, :] * 1.e+3 f_int_edif = f_int[2, :] * 1.e+3 f_int_ss = f_int[3, :] f_int_ee = f_int_edir * np.cos(np.deg2rad(sza)) + f_int_edif for kk, rho in enumerate(rhos): l_toa = (f_int_l0 + f_int_ee * rho / np.pi / (1 - f_int_ss * rho)) * fac l_toa_lut[ii, jj, kk, :] = l_toa path_sol = get_data_file(module_name="sicor", file_basename="solar_irradiances_400_2500_1.dill") with open(path_sol, "rb") as fl: solar_lut = dill.load(fl) solar_res = solar_lut @ s_norm rfl_1_img = data[:, :, idx[0]] / solar_res[idx[0]] rfl_2_img = data[:, :, idx[1]] / solar_res[idx[1]] rfl_2_img[rfl_2_img == 0.0] = 0.001 rfl_3_img = rfl_1_img + (rfl_2_img - rfl_1_img) * (intp_wvl[idx[2]] - intp_wvl[idx[0]]) / ( intp_wvl[idx[1]] - intp_wvl[idx[0]]) if np.min(rfl_3_img) <= 0: for ii in range(rfl_3_img.shape[0]): for jj in range(rfl_3_img.shape[1]): if rfl_3_img[ii, jj] <= 0: rfl_3_img[ii, jj] = 0.001 rfl_sl_img = (rfl_2_img / rfl_3_img).astype(float) rfl_sl_gr = [np.min(rfl_sl_img) * 0.9, np.max(rfl_sl_img) * 1.1] dim_sl = len(rfl_sl_gr) alog_rat_wv_lut = np.zeros((len(hsfs), len(cwvs), len(rhos), dim_sl)) rat_wv_lut = l_toa_lut[:, :, :, idx[2]] / l_toa_lut[:, :, :, idx[1]] alog_rat_wv_lut[:, :, :, 0] = np.log(rat_wv_lut * rfl_sl_gr[0]) alog_rat_wv_lut[:, :, :, 1] = np.log(rat_wv_lut * rfl_sl_gr[1]) cf_arr = np.zeros((3, len(hsfs), len(rhos), dim_sl)) for ii in range(len(hsfs)): for jj in range(len(rhos)): for kk in range(dim_sl): cf_arr[:, ii, jj, kk] = np.polyfit(x=alog_rat_wv_lut[ii, :, jj, kk], y=cwvs, deg=2) alog_rat_wv_img = np.log(toa_sub[:, 1] / toa_sub[:, 0] * rfl_sl_img.flatten()) if hsfs[1] != hsfs[0]: dem_fac = (dem - hsfs[0]) / (hsfs[1] - hsfs[0]) dem_fac = dem_fac.flatten() else: dem_fac = np.zeros(cnt_land) s0 = solar_res[idx[1]] ll_cal = np.pi * data[:, :, idx[1]].flatten() / (s0 * np.cos(np.deg2rad(sza))) # COMPUTE WATER VAPOUR # # ====================== # handle cases where LUT range is exceeded by the spectrum (use lowest/highest rho) ll_cal[ll_cal < rhos[0]] = rhos[0] + 1e-5 # (1, 1024000) # + 1e-10 avoids zero-division at rfl_fac below ll_cal[ll_cal > rhos[-1]] = rhos[-1] - 1e-5 # (1, 1024000) # - 1e-10 avoids zero-division at rfl_fac below ll_cal_low = ll_cal[None, :] >= np.array(rhos)[:, None] # (11, 1024000) ll_cal_high = ll_cal[None, :] <= np.array(rhos)[:, None] # (11, 1024000) # get index of last "True" of ll_cal_low_full columns idx_low = len(rhos) - np.argmax(ll_cal_low[::-1], axis=0) - 1 # (1, 1024000) # get index of first "True" of ll_cal_high_full columns idx_high = np.argmax(ll_cal_high, axis=0) # (1, 1024000) # get corresponding rhos rhos_at_idx_low = np.array(rhos)[None, :][:, idx_low] # (1, 1024000) rhos_at_idx_high = np.array(rhos)[None, :][:, idx_high] # (1, 1024000) rfl_fac = ((ll_cal - rhos_at_idx_low) / (rhos_at_idx_high - rhos_at_idx_low)) # (1, 1024000) rfl_sl = ((rfl_sl_img.flatten() - rfl_sl_gr[0]) / (rfl_sl_gr[1] - rfl_sl_gr[0])) # (1, 1024000) cf_int = ( (1 - dem_fac) * (1 - rfl_fac) * (1 - rfl_sl) * cf_arr[:, 0, :, 0][:, idx_low] + (1 - dem_fac) * (1 - rfl_fac) * rfl_sl * cf_arr[:, 0, :, 1][:, idx_low] + (1 - dem_fac) * rfl_fac * (1 - rfl_sl) * cf_arr[:, 0, :, 0][:, idx_high] + (1 - dem_fac) * rfl_fac * rfl_sl * cf_arr[:, 0, :, 1][:, idx_high] + dem_fac + (1 - rfl_fac) * (1 - rfl_sl) * cf_arr[:, 1, :, 0][:, idx_low] + dem_fac * (1 - rfl_fac) * rfl_sl * cf_arr[:, 1, :, 1][:, idx_low] + dem_fac * rfl_fac * (1 - rfl_sl) * cf_arr[:, 1, :, 0][:, idx_high] + dem_fac * rfl_fac * rfl_sl * cf_arr[:, 1, :, 1][:, idx_high] ) # (3, 1024000) wv_flat = ( cf_int[2, :] + alog_rat_wv_img * cf_int[1, :] + alog_rat_wv_img * alog_rat_wv_img * cf_int[0, :] ) # (1, 1024000) wv_arr_img = wv_flat.reshape(data.shape[:2]) # (1024, 1000) return wv_arr_img