"""Read Sentinel-2 Level-1C and Level-2A data into RSImage object."""
import logging
import re
import warnings
from glob import glob
import os
from os import path
from os.path import abspath
from subprocess import call
from tempfile import TemporaryFile
from time import time
from uuid import uuid1
from xml.etree.ElementTree import QName
import xml.etree.ElementTree
from osgeo import gdal, osr
import glymur
import iso8601
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyproj
from PIL import Image
from osgeo.gdalconst import GA_ReadOnly
from geoarray import GeoArray
from psutil import virtual_memory
from ...RSImage import RSImage
from ..GranuleInfo import GranuleInfo
[docs]
def get_tck(x, y, z, kk_default=3, logger=None):
"""Estimate spline model parameters."""
from scipy.interpolate import bisplrep # import here to avoid static TLS ImportError
logger = logger or logging.getLogger(__name__)
# estimate max number of allowed spline order
mm = len(x)
kk_max = int(np.floor(np.sqrt(mm / 2.0) - 1))
kk = np.min([kk_default, kk_max])
result = bisplrep(x=x, y=y, z=z, kx=kk, ky=kk, full_output=1)
if result[2] > 0:
logger.info("Interpolation problem:%s" % result[-1])
logger.info("Now, try to adjust s")
result = bisplrep(x=x, y=y, z=z, kx=kk, ky=kk, s=result[1], full_output=1)
if result[2] > 0:
raise ValueError("Interpolation problem:%s" % result[-1])
return result[0]
# noinspection PyMissingConstructor,PyShadowingNames
[docs]
class S2Image(RSImage):
"""Load Sentinel-2 Level-1C or Level-2A product."""
# noinspection PyDefaultArgument,PyDictCreation
def __init__(self, granule_path, import_bands="all",
namespace_candidate=None, aux_fields=None, default_u=1.0,
target_resolution=None, dtype_float=np.float16, dtype_int=np.int16, unit="reflectance",
interpolation_order=1, data_mode="dense", driver="OpenJpeg2000",
call_cmd=None, logger=None, slice_x=slice(None), slice_y=slice(None),
default_solar_irradiance={'B01': 1913.57, 'B02': 1941.63, 'B03': 1822.61, 'B04': 1512.79,
'B05': 1425.56, 'B06': 1288.32, 'B07': 1163.19, 'B08': 1036.39,
'B09': 813.04, 'B10': 367.15, 'B11': 245.59, 'B12': 85.25, 'B8A': 955.19},
**kwarg):
"""
Reads Sentinel-2 MSI data into numpy array. Images for different channels are resampled to a common sampling
:param granule_path: path to granule folder, folder should contain IMG_DATA folder and S2A_[..].xml file
:param import_bands: list of bands to import, default: ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08',
'B8A', 'B09', 'B10', 'B11', 'B12',]
:param namespace_candidate: for XML file, or None, then several default namespaces are tested
:param target_resolution: spatial resolution in meter, or None for data not interpolated
:param interpolation_order: integer for interpolation 1,2,3
:param data_mode: either "dense" or "sparse"
:param aux_fields: either None or dict of to be loaded aux fields, e.g. {"cwv":[ss],"spr":[ss],"ozo":[ss]}
with ss being "mean" or iterable of spatial samplings, e.g. [20.0] or [20.0,60.0]
:param driver: should be "OpenJpeg2000" -> glymur, "kdu_expand"->shell command, "gdal_[driver] e.g. gdal_JP2ECW
gdal_JPEG2000, gdal_JP2OpenJPEG,gdal_JP2KAK
..todo:: Real nodata mask
"""
# import here to avoid static TLS ImportError
from scipy.interpolate import bisplev
from scipy.ndimage import zoom
t0_init = time()
self.full_band_list = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B8A', 'B09', 'B10', 'B11',
'B12']
self.logger = logger or logging.getLogger(__name__)
self.metadata = {}
self.namespace = namespace_candidate
self.call_cmd = call_cmd
self.sliceX = slice(None, None, None)
if isinstance(slice_x, slice):
self.sliceX = slice_x
self.sliceY = slice(None, None, None)
if isinstance(slice_y, slice):
self.sliceY = slice_y
self.driver = str(driver)
if unit in ["reflectance", "dn"]:
self.unit = unit
else:
raise ValueError("Unit not implemented: %s" % str(unit))
self.dtype_float = dtype_float
self.dtype_int = dtype_int
if self.unit == "dn":
self.dtype_return = self.dtype_int
self.bad_data_value = -999
else:
self.dtype_return = self.dtype_float
self.bad_data_value = np.nan
self.target_resolution = target_resolution
self.S2_MSI_granule_path = path.abspath(granule_path)
self.tile_name = re.search('_T[0-9]{2,2}[A-Z]{3,3}_', self.S2_MSI_granule_path).group(0).replace("_", "")[1:]
# find XML files for granule (granule folder) and product (two folders above granule folder)
self.granule_xml_file_name = self.get_granule_metadata_xml()
if self.namespace is not None:
self.metadata.update(S2Image.parse_s2_granule_xml(self.granule_xml_file_name, self.namespace))
else:
self.logger.info("No namespace was given, try default ones:")
for namespace_candidate in ["https://psd-12.sentinel2.eo.esa.int/PSD/S2_PDI_Level-1C_Tile_Metadata.xsd",
"https://psd-12.sentinel2.eo.esa.int/PSD/S2_PDI_Level-2A_Tile_Metadata.xsd",
"https://psd-14.sentinel2.eo.esa.int/PSD/S2_PDI_Level-1C_Tile_Metadata.xsd"]:
self.logger.info("Namespace:%s" % namespace_candidate)
try:
self.metadata.update(S2Image.parse_s2_granule_xml(self.granule_xml_file_name, namespace_candidate))
self.namespace = namespace_candidate
except Exception: # go over any exception, raise later if self.namespace is still None
pass
if self.namespace is None:
raise ValueError("None of the default xml namespaces worked, please supply as parameter.")
else:
self.logger.info("Final namespace: %s" % self.namespace)
try:
self.product_xml_file_name = glob(path.join(path.dirname(path.dirname(self.S2_MSI_granule_path)),
"*MTD*.xml"))[0]
self.metadata.update(S2Image.parse_s2_product_xml(self.product_xml_file_name))
self.metadata['solar_irradiance'] = {
self.metadata['bandId2bandName'][bi]: self.metadata['solar_irradiance_bid'][bi] for bi in
self.metadata['solar_irradiance_bid'].keys()}
except IndexError:
self.product_xml_file_name = None
self.logger.info("Product xml file is not available -> fallback to default values.")
self.metadata.update({"dn2rfl": 10000,
"solar_irradiance": default_solar_irradiance,
"physical_gains": None,
"U": default_u})
except AttributeError:
# apparantly the xml is not L1C
self.metadata.update({"dn2rfl": 10000})
self.logger.info("Load S2 MSI data from:%s" % self.S2_MSI_granule_path)
if os.name == "nt":
warnings.warn("Reading Sentinel-2 aux data in grib file format is not supported on Windows."
"Alternatively, please download aux data from ECMWF in netCDF file format.")
else:
self.metadata["aux_data"] = S2Image.read_aux_data(self.S2_MSI_granule_path)
# select bands
if import_bands == "all":
self.band_list = self.metadata["bandNames"]
else:
self.band_list = list(import_bands)
# search for jp2 files which contain the data
self.band_fns = {self._band_name_l1c(fn): fn for fn in
glob(path.join(self.S2_MSI_granule_path, "IMG_DATA", "*_B*.jp2"))}
suffix = ""
self.band_fns.update({self._band_name_l2a(fn): fn for fn in glob(path.join(
self.S2_MSI_granule_path, "IMG_DATA", "R[1,2,6]0m", "*_[1,2,6]0m%s.jp2" % suffix))})
self.band_list = list(self.band_fns)
# import masks
self.msk_fns = glob(path.join(self.S2_MSI_granule_path, "*_MSK_*[1,2,6]0m%s.jp2" % suffix))
self.msk = {self._spatial_sampling_msk(fn): GeoArray(fn) for fn in self.msk_fns}
# set final sizes
if self.target_resolution is None:
self.final_shape = None
self.data = {}
elif self.target_resolution in self.metadata["spatial_samplings"].keys():
ss = self.metadata["spatial_samplings"]
self.full_shape = [ss[self.target_resolution][ii] for ii in ("NROWS", "NCOLS")]
self.final_shape = list(np.empty(self.full_shape)[self.sliceX, self.sliceY].shape)
self.logger.info("Final shape for each channel: %s" % str(self.final_shape))
if data_mode == "dense":
self.data = self.__zeros__(shape=(self.final_shape + [len(self.band_list)]),
dtype=self.dtype_return, logger=self.logger)
elif data_mode == "sparse":
self.data = self.__zeros__(shape=list(self.final_shape) + [len(self.metadata["bandNames"])],
dtype=self.dtype_return, logger=self.logger)
else:
raise ValueError("data_mode=%s not implemented" % data_mode)
else:
raise ValueError("target_resolution should be None or in: %s" %
str(list(self.metadata["spatial_samplings"].keys())))
# simple nodata mask -> should be improved
try:
self.nodata = {10.0: self.__read_img(self.band_fns["B02"]) == 0}
except KeyError:
self.nodata = {10.0: self.__read_img(self.band_fns["B02_10m"]) == 0}
self.nodata[20.0] = zoom(self.nodata[10.0], 1 / 2, order=0)
self.nodata[60.0] = zoom(self.nodata[10.0], 1 / 6, order=0)
self.yesdata = {tr: np.logical_not(msk) for tr, msk in self.nodata.items()}
# get projection for all bands
self.metadata["projection"] = {band: S2Image.get_projection(self.band_fns[band]) for band in self.band_list}
self.cnv = {}
_t0 = time()
for iband, band in enumerate(self.band_list):
t0 = time()
image_data_raw = self.__read_img(self.band_fns[band])
if self.unit == "reflectance":
# make image conversion on high precision, then convert to final type
image_data = np.array(np.array(image_data_raw[:, :], dtype=np.float64) / self.metadata["dn2rfl"],
dtype=self.dtype_return)
elif self.unit == "dn":
image_data = np.array(image_data_raw[:, :], dtype=self.dtype_return)
image_data[self.bad_data_mask(image_data)] = self.bad_data_value
if self.target_resolution is None:
self.data[band] = np.array(image_data, dtype=self.dtype_float)
t2 = time()
self.logger.info("Read band %s in %.2fs." % (band, t2 - t0))
else:
zoom_fac = self.metadata["shape_to_resolution"][image_data.shape] / self.target_resolution
if data_mode == "dense":
ii = iband
elif data_mode == "sparse":
ii = self.full_band_list.index(band)
else:
raise ValueError("data_mode=%s not implemented" % data_mode)
t1 = time()
# noinspection PyUnresolvedReferences
self.data[:, :, ii] = zoom(input=np.array(image_data, dtype=np.float32),
zoom=zoom_fac,
order=interpolation_order)[self.sliceX, self.sliceY]
self.cnv[ii] = band
self.cnv[band] = ii
t2 = time()
self.logger.info("""Read band %s in %.2fs, pure load time:%.2fs, resample time: %.2fs, zoom: %.3f,
final shape: %s, index: %i""" % (band, t2 - t0, t1 - t0, t2 - t1, zoom_fac, str(self.final_shape), ii))
self.logger.info("Total time for reading and converting bands: %.2fs" % (time() - _t0))
if self.target_resolution is None:
self.band_spatial_sampling = {band: self.metadata['shape_to_resolution'][data.shape]
for band, data in self.data.items()}
self.spatial_sampling_shapes = {self.band_spatial_sampling[band]: data.shape
for band, data in self.data.items()}
if aux_fields is not None and self.metadata["aux_data"]:
self.logger.info("Read aux-data from level 1C Product.")
ti = GranuleInfo(version="lite")[self.tile_name]
zm = 2 * np.ceil(np.mean(self.metadata["aux_data"]["lons"].shape))
lats_s2 = zoom(np.array([[ti["tl"]["lat"], ti["tr"]["lat"]], [ti["ll"]["lat"], ti["lr"]["lat"]]]), zoom=zm)
lons_s2 = zoom(np.array([[ti["tl"]["lon"], ti["tr"]["lon"]], [ti["ll"]["lon"], ti["lr"]["lon"]]]), zoom=zm)
fields_func = {field: get_tck(x=self.metadata["aux_data"]["lons"].flatten(),
y=self.metadata["aux_data"]["lats"].flatten(),
z=self.metadata["aux_data"][field].flatten()
) for field in aux_fields.keys()}
self.fields_func = fields_func
self.aux_fields = {}
self.aux_fields["lats_s2"] = lats_s2
self.aux_fields["lons_s2"] = lons_s2
for field_name in aux_fields.keys():
self.logger.info("Interpolate: %s" % field_name)
bf = np.zeros(lats_s2.shape)
for ii0 in range(lats_s2.shape[0]):
for ii1 in range(lats_s2.shape[1]):
# noinspection PyUnresolvedReferences
bf[ii0, ii1] = bisplev(x=lons_s2[ii0, ii1],
y=lats_s2[ii0, ii1],
tck=fields_func[field_name])
if self.final_shape is not None:
zoom_fac = (self.full_shape[0] / bf.shape[0],
self.full_shape[1] / bf.shape[1])
bf_zoom = zoom(bf, zoom_fac, order=3)
self.aux_fields[field_name] = np.array(bf_zoom[self.sliceX, self.sliceY], dtype=np.float32)
else:
if aux_fields[field_name] == "mean":
self.aux_fields[field_name] = np.mean(bf)
else:
self.aux_fields[field_name] = {}
for spatial_sampling, res in self.metadata["spatial_samplings"].items():
if spatial_sampling in aux_fields[field_name]:
zoom_fac = (res["NCOLS"] / bf.shape[0], res["NROWS"] / bf.shape[0])
bf_zoom = zoom(bf, zoom_fac, order=3)
self.aux_fields[field_name][spatial_sampling] = np.array(bf_zoom, dtype=np.float32)
self.logger.info("Interpolate: %s, zoom: %s, sampling: %f" %
(field_name, "(%.1f,%.1f)" % zoom_fac, spatial_sampling))
if ("lats" not in self.metadata['aux_data']) or ("lats" not in self.metadata['aux_data']):
# if lons or lats are missing from metadata -> add from tile info
self.logger.warning("Missing lon/lats from aux data -> get it from granule info.")
ti = GranuleInfo(version="lite")[self.tile_name]
lats = np.zeros((2, 2), dtype=float)
lons = np.zeros((2, 2), dtype=float)
try:
for i1, tb in enumerate(["t", "l"]):
for i2, lr in enumerate(["l", "r"]):
lons[i1, i2] = ti["%s%s" % (tb, lr)]["lon"]
lats[i1, i2] = ti["%s%s" % (tb, lr)]["lat"]
self.metadata['aux_data']["lats"] = zoom(lats, 5)
self.metadata['aux_data']["lons"] = zoom(lons, 5)
except KeyError:
logger.warning("Granule info incomplete:" + str(ti))
self.logger.info("Total runtime: %.2fs" % (time() - t0_init))
@staticmethod
def __zeros__(shape, dtype, max_mem_frac=0.3, logger=None):
logger = logger or logging.getLogger(__name__)
def in_memory_array(shape, dtype):
"""Use in-memory numpy array."""
return np.zeros(shape, dtype)
def out_memory_array(shape, dtype):
"""Fall back to memory mapped file if memory is not sufficient."""
logger.warning("Not enough memory to keep full image -> fall back to memorymap.")
dat = np.memmap(filename=TemporaryFile(mode="w+b"), dtype=dtype, shape=tuple(shape))
dat[:] = 0.0
return dat
to_gb = 1.0 / 1024.0 ** 3
mem = virtual_memory().total * to_gb
arr = int(np.prod(np.array(shape, dtype=np.int64)) * np.zeros(1, dtype=dtype).nbytes * to_gb)
if arr < max_mem_frac * mem:
try:
return in_memory_array(shape, dtype)
except MemoryError:
return out_memory_array(shape, dtype)
else:
logger.info(
"Try to create array of size %.2fGB on a box with %.2fGB memory -> fall back to memorymap." % (
arr, mem))
return out_memory_array(shape, dtype)
[docs]
@staticmethod
def parse_s2_product_xml(fn):
"""
S2 XML helper function to parse product xml file
:param fn: file name of xml file
:return: metadata dictionary
"""
metadata = {}
xml_root = xml.etree.ElementTree.parse(fn).getroot()
metadata["dn2rfl"] = (int(xml_root.find(".//QUANTIFICATION_VALUE").text))
metadata["solar_irradiance_bid"] = {int(ele.get('bandId')): float(ele.text) for ele in
xml_root.find(".//Solar_Irradiance_List").findall("SOLAR_IRRADIANCE")}
metadata["physical_gains"] = {int(ele.get('bandId')): float(ele.text) for ele in
xml_root.findall(".//PHYSICAL_GAINS")}
metadata["U"] = float(xml_root.find(".//U").text)
return metadata
[docs]
@staticmethod
def read_aux_data(fn):
"""
Read grib file with aux data, return dictionary
:param fn: path to granule
:return: dict with data
"""
import pygrib
metadata = {}
try:
fn_aux_data = glob(path.join(fn, "AUX_DATA", "S2A_*"))[0]
aux_data = pygrib.open(fn_aux_data)
for var_name, index in zip(["cwv", "spr", "ozo"], [1, 2, 3]):
metadata[var_name] = aux_data.message(index).data()[0]
metadata["%s_unit" % var_name] = aux_data.message(index)["parameterUnits"]
metadata["lats"], metadata["lons"] = aux_data.message(1).latlons()
return metadata
except Exception:
return metadata
[docs]
@staticmethod
def parse_s2_granule_xml(fn, namespace):
"""
parse XML file in granule folder and return metadata
:param fn: full filename tile xml file
:param namespace: xml namespace
:return: dictionary with metadata
"""
def stack_detectors(inp):
"""Some data is given per detector with NaN values otherwise -> stack those to a common image."""
warnings.filterwarnings(action='ignore', message=r'Mean of empty slice')
res = {bandId: np.nanmean(np.dstack(tuple(inp[bandId].values())), axis=2) for bandId, dat in inp.items()}
warnings.filterwarnings(action='default', message=r'Mean of empty slice')
return res
xml_root = xml.etree.ElementTree.parse(fn).getroot()
metadata = {}
general_info = S2Image.find_in_xml_root(namespace, xml_root, "General_Info")
for key in ["TILE_ID", "DATASTRIP_ID", "TILE_ID_2A"]:
try:
metadata[key] = general_info.find(key).text
except AttributeError:
pass
metadata["SENSING_TIME"] = iso8601.parse_date(general_info.find("SENSING_TIME").text)
geo_codings = S2Image.find_in_xml_root(namespace, xml_root, 'Geometric_Info', "Tile_Geocoding")
metadata["HORIZONTAL_CS_NAME"] = geo_codings.find("HORIZONTAL_CS_NAME").text
metadata["HORIZONTAL_CS_CODE"] = geo_codings.find("HORIZONTAL_CS_CODE").text
metadata["bandId2bandName"] = {
int(ele.get("bandId")): re.search("_B[0-18][0-9A]", ele.text).group().split("_")[-1] for ele in
xml_root.findall(".//MASK_FILENAME") if ele.get("bandId") is not None}
metadata["bandName2bandId"] = {bandName: bandId for bandId, bandName in metadata["bandId2bandName"].items()}
metadata["bandIds"] = sorted(list(metadata["bandId2bandName"].keys()))
metadata["bandNames"] = sorted(list(metadata["bandName2bandId"].keys()))
metadata["sun_zenith"] = S2Image.get_values_from_xml(S2Image.find_in_xml_root(namespace, xml_root, *(
"Geometric_Info", "Tile_Angles", "Sun_Angles_Grid", "Zenith", "Values_List")))
metadata["sun_azimuth"] = S2Image.get_values_from_xml(S2Image.find_in_xml_root(namespace, xml_root, *(
"Geometric_Info", "Tile_Angles", "Sun_Angles_Grid", "Azimuth", "Values_List")))
metadata["sun_mean_zenith"] = float(S2Image.find_in_xml_root(namespace, xml_root, *(
"Geometric_Info", "Tile_Angles", "Mean_Sun_Angle", "ZENITH_ANGLE")).text)
metadata["sun_mean_azimuth"] = float(S2Image.find_in_xml_root(namespace, xml_root, *(
"Geometric_Info", "Tile_Angles", "Mean_Sun_Angle", "AZIMUTH_ANGLE")).text)
branch = S2Image.find_in_xml_root(namespace, xml_root, *("Geometric_Info", "Tile_Angles"))
metadata["viewing_zenith_detectors"] = {
bandId: {bf.get("detectorId"): S2Image.get_values_from_xml(
S2Image.find_in_xml(bf, *("Zenith", "Values_List"))) for bf in
branch.findall("Viewing_Incidence_Angles_Grids[@bandId='%i']" % bandId)} for bandId in
metadata["bandIds"]}
try:
metadata["viewing_zenith"] = stack_detectors(metadata["viewing_zenith_detectors"])
except ValueError:
metadata["viewing_zenith"] = 0.0
metadata["viewing_azimuth_detectors"] = {bandId: {bf.get("detectorId"): S2Image.get_values_from_xml(
S2Image.find_in_xml(bf, *("Azimuth", "Values_List"))) for bf in branch.findall(
"Viewing_Incidence_Angles_Grids[@bandId='%i']" % bandId)} for bandId in metadata["bandIds"]}
try:
metadata["viewing_azimuth"] = stack_detectors(metadata["viewing_azimuth_detectors"])
except ValueError:
metadata["viewing_azimuth"] = 0.0
metadata["spatial_samplings"] = {
float(size.get("resolution")): {key: int(size.find(key).text) for key in ["NROWS", "NCOLS"]} for size in
geo_codings.findall("Size")}
for geo in geo_codings.findall("Geoposition"):
metadata["spatial_samplings"][float(geo.get("resolution"))].update(
{key: int(geo.find(key).text) for key in ["ULX", "ULY", "XDIM", "YDIM"]})
metadata["shape_to_resolution"] = {(values["NCOLS"], values["NROWS"]): spatial_sampling for
spatial_sampling, values in metadata["spatial_samplings"].items()}
return metadata
[docs]
@staticmethod
def find_in_xml_root(namespace, xml_root, branch, *branches, findall=None):
"""
S2 xml helper function, search from root
:param namespace:
:param xml_root:
:param branch: first branch, is combined with namespace
:param branches: repeated find's along these parameters
:param findall: if given, at final a findall
:return: found xml object, None if nothing was found
"""
buf = xml_root.find(str(QName(namespace, branch)))
for br in branches:
buf = buf.find(br)
if findall is not None:
buf = buf.findall(findall)
return buf
[docs]
@staticmethod
def find_in_xml(xml, *branch):
"""
S2 xml helper function
:param xml: xml object
:param branch: iterate to branches using find
:return: xml object, None if nothing was found
"""
buf = xml
for br in branch:
buf = buf.find(br)
return buf
[docs]
@staticmethod
def get_values_from_xml(leaf, dtype=float):
"""
S2 xml helper function
:param leaf: xml object which is searched for VALUES tag which are then composed into a numpy array
:param dtype: dtype of returned numpy array
:return: numpy array
"""
return np.array([ele.text.split(" ") for ele in leaf.findall("VALUES")], dtype=dtype)
[docs]
@staticmethod
def _band_name_l1c(fn):
"""
S2 helper function, parse band name from jp2 file name
:param fn: filename
:return: string with band name
"""
return fn.split(".jp2")[0].split("_")[-1]
[docs]
@staticmethod
def _spatial_sampling_msk(fn):
return path.basename(fn).split(".jp2")[0].split("_")[10]
[docs]
@staticmethod
def _band_name_l2a(fn):
for pr in ["AOT", "WVP", "VIS", "TCI", "SCL"]:
if pr in fn:
return "%s_%s" % (pr, fn.split("_")[-1].split(".")[0])
band_name = re.search('_B[0-9][0-9A]_', path.basename(fn).split(".jp2")[0]).group().replace("_", "")
spatial_sampling = re.search('_[126]0m[_.]', path.basename(fn).split("jp2")[0]).group().replace("_",
"").replace(".",
"")
return "%s_%s" % (band_name, spatial_sampling)
[docs]
def bad_data_mask(self, image_data):
""" Return mask of bad data
:param image_data: numpy array
:return: 2D boolean array
"""
self.logger.warning("Masks are computed on value basis, provider masks are not yet implemented")
nodata = self.nodata[self.metadata['shape_to_resolution'][image_data.shape]]
assert nodata.shape == image_data.shape
if self.unit == "reflectance":
return nodata
elif self.unit == "dn":
return nodata
else:
raise ValueError("Unit not implemented: %s" % self.unit)
def __read_img(self, fn):
self.logger.info("Reading: %s" % fn)
if self.driver == "OpenJpeg2000":
img = np.array(glymur.Jp2k(fn)[:, :], dtype=np.int16)
elif self.driver == "kdu_expand":
img = self.__read_jp2_kdu_app(fn, call_cmd=self.call_cmd)
elif re.search("gdal[_]", self.driver) is not None:
img = S2Image.gdal_read(fn, self.driver.split("_")[-1])
else:
raise ValueError("Driver not supported: %s" % self.driver)
return img
@staticmethod
def __rd(fn):
return fn
@staticmethod
def __read_jp2_kdu_app(fn, call_cmd=None):
fn_tmp = "/dev/shm/%s.tif" % uuid1()
cmd_kdu = "kdu_expand -i %s -o %s -fprec 16" % (abspath(fn), fn_tmp)
cmd_rm = "rm %s" % fn_tmp
if call_cmd is None:
call(cmd_kdu, shell=True)
else:
call_cmd(cmd_kdu)
dat = np.array(Image.open(fn_tmp), dtype=np.int16)
if call_cmd is None:
call(cmd_rm, shell=True)
else:
call_cmd(cmd_rm)
return dat
[docs]
@staticmethod
def gdal_read(fnjp, driver_name):
"""Read image file using gdal."""
gdal_drivers = [gdal.GetDriver(ii).GetDescription() for ii in range(gdal.GetDriverCount())]
if driver_name not in gdal_drivers:
raise ValueError("Selected driver seems to be missing in gdal, available are: %s" % str(gdal_drivers))
else:
while gdal.GetDriverCount() > 1:
for ii in range(gdal.GetDriverCount()):
drv = gdal.GetDriver(ii)
if drv is not None:
if drv.GetDescription() != driver_name:
try:
drv.Deregister()
except AttributeError:
pass
ds = gdal.Open(fnjp, GA_ReadOnly)
img = ds.GetRasterBand(1).ReadAsArray()
# noinspection PyUnusedLocal
ds = None
return img
[docs]
@staticmethod
def save_rgb_image(rgb_img, fn, dpi=100.0):
"""Save image as RGB to file system."""
fig = plt.figure(figsize=np.array(rgb_img.shape[:2]) / dpi)
plt.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
ax = plt.subplot()
ax.imshow(rgb_img, interpolation="none")
ax.set_axis_off()
plt.savefig(fn, dpi=dpi)
fig.clear()
plt.close(fig)
[docs]
@staticmethod
def get_projection(file_name):
"""
:param file_name string, points to filename
:return: return {"gt":geotransform,"prj":coordinate projections string} for filename
:rtype dict
"""
ds = gdal.Open(file_name)
gt = ds.GetGeoTransform()
prj = ds.GetProjection()
ds = None # release memory
# noinspection PyBroadException
try:
mapinfo = S2Image.geotransform2mapinfo(gt, prj)
except Exception:
mapinfo = None
return {"gt": gt, "prj": prj, "mapinfo": mapinfo}
[docs]
class s2_snr_model(object):
"""Sentinel-2 SNR model from CNES instrument characterization.
:param: s2_snr_csv_file: path to valid s2_snr_csv_file, e.g. line: B1,129.0,588,"0,04840000","0,00003130",
"4,07374605","129,00","525,5132410906","0,1370807224","941,05","1614,99","983,3"
:param: rfl_to_rad: dict with band names as keys and conversion factor from reflectance to radiance, e.g.:
rfl_to_rad={'B8a': 0.0, 'B12': 0.0, 'B10': 0.0, 'B03': 0.0, 'B07': 0.0, 'B04': 0.0, 'B11': 0.0,
'B02': 0.0, 'B08': 0.0, 'B05': 0.0, 'B09': 0.0, 'B01': 0.0, 'B06': 0.0}
"""
def __init__(self, s2_snr_csv_file, rfl_to_rad, logger=None):
# get logger
self.logger = logging.getLogger() if logger is None else logger
# read data file
self.s2_snr_csv_file = s2_snr_csv_file
df = pd.read_csv(self.s2_snr_csv_file, decimal=",")
to_band_name = (
lambda x: x.upper() if len(x) == 3 else "B0" + x[1]) # normalize band names: Bx -> B0x and Byx -> Byx
self.snr_parameters = {to_band_name(ll[0]): np.array([l.replace(",", ".") for l in ll[3:6]], dtype=float)
for ll in df.values[2:2 + 13, :]}
self.rfl_to_rad = rfl_to_rad
[docs]
def noise(self, reflectance, band_name):
"""Sentinel-2 radiance to noise model"""
# get noise model parameters: sqrt(x0^2+x1*x2*radiance)
x = self.snr_parameters[band_name]
rfl2rad = self.rfl_to_rad[band_name]
# (noise in radiance ( -> to radiance )) -> convert back to reflectance
return np.sqrt(x[0] ** 2.0 + x[1] * (x[2] * reflectance * rfl2rad)) / rfl2rad # noise reflectance