Source code for climada.hazard.io

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define Hazard IO Methods.
"""

import datetime as dt
import itertools
import logging
import pathlib
import warnings
from collections.abc import Collection
from typing import Any, Dict, Optional, Union

import h5py
import numpy as np
import pandas as pd
import rasterio
import xarray as xr
from deprecation import deprecated
from scipy import sparse

import climada.util.constants as u_const
import climada.util.coordinates as u_coord
import climada.util.hdf5_handler as u_hdf5
from climada.hazard.centroids.centr import Centroids

from .xarray import HazardXarrayReader

LOGGER = logging.getLogger(__name__)

DEF_VAR_EXCEL = {
    "sheet_name": {"inten": "hazard_intensity", "freq": "hazard_frequency"},
    "col_name": {
        "cen_id": "centroid_id/event_id",
        "even_id": "event_id",
        "even_dt": "event_date",
        "even_name": "event_name",
        "freq": "frequency",
        "orig": "orig_event_flag",
    },
    "col_centroids": {
        "sheet_name": "centroids",
        "col_name": {
            "cen_id": "centroid_id",
            "latitude": "lat",
            "longitude": "lon",
        },
    },
}
"""Excel variable names"""

DEF_VAR_MAT = {
    "field_name": "hazard",
    "var_name": {
        "per_id": "peril_ID",
        "even_id": "event_ID",
        "ev_name": "name",
        "freq": "frequency",
        "inten": "intensity",
        "unit": "units",
        "frac": "fraction",
        "comment": "comment",
        "datenum": "datenum",
        "orig": "orig_event_flag",
    },
    "var_cent": {
        "field_names": ["centroids", "hazard"],
        "var_name": {"cen_id": "centroid_ID", "lat": "lat", "lon": "lon"},
    },
}
"""MATLAB variable names"""


# pylint: disable=no-member


[docs] class HazardIO: """ Contains all read/write methods of the Hazard class """
[docs] def set_raster(self, *args, **kwargs): """This function is deprecated, use Hazard.from_raster.""" LOGGER.warning( "The use of Hazard.set_raster is deprecated." "Use Hazard.from_raster instead." ) self.__dict__ = self.__class__.from_raster(*args, **kwargs).__dict__
[docs] @classmethod def from_raster( cls, files_intensity, files_fraction=None, attrs=None, band=None, haz_type=None, pool=None, src_crs=None, window=None, geometry=None, dst_crs=None, transform=None, width=None, height=None, resampling=rasterio.warp.Resampling.nearest, ): """Create Hazard with intensity and fraction values from raster files If raster files are masked, the masked values are set to 0. Files can be partially read using either window or geometry. Additionally, the data is reprojected when custom dst_crs and/or transform, width and height are specified. Parameters ---------- files_intensity : list(str) file names containing intensity files_fraction : list(str) file names containing fraction attrs : dict, optional name of Hazard attributes and their values band : list(int), optional bands to read (starting at 1), default [1] haz_type : str, optional acronym of the hazard type (e.g. 'TC'). Default: None, which will use the class default ('' for vanilla `Hazard` objects, and hard coded in some subclasses) pool : pathos.pool, optional Pool that will be used for parallel computation when applicable. Default: None src_crs : crs, optional source CRS. Provide it if error without it. window : rasterio.windows.Windows, optional window where data is extracted geometry : list of shapely.geometry, optional consider pixels only within these shapes dst_crs : crs, optional reproject to given crs transform : rasterio.Affine affine transformation to apply wdith : float, optional number of lons for transform height : float, optional number of lats for transform resampling : rasterio.warp.Resampling, optional resampling function used for reprojection to dst_crs Returns ------- Hazard """ if isinstance(files_intensity, (str, pathlib.Path)): files_intensity = [files_intensity] if isinstance(files_fraction, (str, pathlib.Path)): files_fraction = [files_fraction] if not attrs: attrs = {} else: attrs = cls._check_and_cast_attrs(attrs) if not band: band = [1] if files_fraction is not None and len(files_intensity) != len(files_fraction): raise ValueError( "Number of intensity files differs from fraction files:" f"{len(files_intensity)} != {len(files_fraction)}" ) # List all parameters for initialization here (missing ones will be default) hazard_kwargs = dict() if haz_type is not None: hazard_kwargs["haz_type"] = haz_type centroids, meta = Centroids.from_raster_file( files_intensity[0], src_crs=src_crs, window=window, geometry=geometry, dst_crs=dst_crs, transform=transform, width=width, height=height, resampling=resampling, return_meta=True, ) if pool: chunksize = max(min(len(files_intensity) // pool.ncpus, 1000), 1) inten_list = pool.map( _values_from_raster_files, [[f] for f in files_intensity], itertools.repeat(meta), itertools.repeat(band), itertools.repeat(src_crs), itertools.repeat(window), itertools.repeat(geometry), itertools.repeat(dst_crs), itertools.repeat(transform), itertools.repeat(width), itertools.repeat(height), itertools.repeat(resampling), chunksize=chunksize, ) intensity = sparse.vstack(inten_list, format="csr") if files_fraction is not None: fract_list = pool.map( _values_from_raster_files, [[f] for f in files_fraction], itertools.repeat(meta), itertools.repeat(band), itertools.repeat(src_crs), itertools.repeat(window), itertools.repeat(geometry), itertools.repeat(dst_crs), itertools.repeat(transform), itertools.repeat(width), itertools.repeat(height), itertools.repeat(resampling), chunksize=chunksize, ) fraction = sparse.vstack(fract_list, format="csr") else: intensity = _values_from_raster_files( files_intensity, meta=meta, band=band, src_crs=src_crs, window=window, geometry=geometry, dst_crs=dst_crs, transform=transform, width=width, height=height, resampling=resampling, ) if files_fraction is not None: fraction = _values_from_raster_files( files_fraction, meta=meta, band=band, src_crs=src_crs, window=window, geometry=geometry, dst_crs=dst_crs, transform=transform, width=width, height=height, resampling=resampling, ) if files_fraction is None: fraction = intensity.copy() fraction.data.fill(1) hazard_kwargs.update(cls._attrs_to_kwargs(attrs, num_events=intensity.shape[0])) return cls( centroids=centroids, intensity=intensity, fraction=fraction, **hazard_kwargs )
[docs] @classmethod @deprecated( 6.0, details="Hazard.from_xarray_raster now supports a filepath as 'data' parameter", ) def from_xarray_raster_file( cls, filepath: Union[pathlib.Path, str], *args, **kwargs ): """Read raster-like data from a file that can be loaded with xarray .. deprecated:: 6.0 Pass ``filepath`` as ``data`` argument to :py:meth:`from_xarray_raster` instead. This wraps :py:meth:`from_xarray_raster` by first opening the target file as xarray dataset and then passing it to that classmethod. Use this wrapper as a simple alternative to opening the file yourself. The signature is exactly the same, except for the first argument, which is replaced by a file path here. Additional (keyword) arguments are passed to :py:meth:`from_xarray_raster`. Parameters ---------- filepath : Path or str Path of the file to read with xarray. May be any file type supported by xarray. See https://docs.xarray.dev/en/stable/user-guide/io.html Returns ------- hazard : climada.Hazard A hazard object created from the input data """ args = (filepath,) + args return cls.from_xarray_raster(*args, **kwargs)
[docs] @classmethod def from_xarray_raster( cls, data: xr.Dataset | pathlib.Path | str, hazard_type: str, intensity_unit: str, *, intensity: str = "intensity", coordinate_vars: Optional[Dict[str, str]] = None, data_vars: Optional[Dict[str, str]] = None, crs: str = u_const.DEF_CRS, rechunk: bool = False, open_dataset_kws: dict[str, Any] | None = None, ): """Read raster-like data from an xarray Dataset This method reads data that can be interpreted using three coordinates: event, latitude, and longitude. The names of the coordinates to be read from the dataset can be specified via the ``coordinate_vars`` parameter. The data and the coordinates themselves may be organized in arbitrary dimensions (e.g. two dimensions 'year' and 'altitude' for the coordinate 'event'). See Notes and Examples if you want to load single-event data that does not contain an event dimension. The only required data is the intensity. For all other data, this method can supply sensible default values. By default, this method will try to find these "optional" data in the Dataset and read it, or use the default values otherwise. Users may specify the variables in the Dataset to be read for certain Hazard object entries, or may indicate that the default values should be used although the Dataset contains appropriate data. This behavior is controlled via the ``data_vars`` parameter. If this method succeeds, it will always return a "consistent" Hazard object, meaning that the object can be used in all CLIMADA operations without throwing an error due to missing data or faulty data types. Parameters ---------- data : xarray.Dataset or Path or str The filepath to read the data from or the already opened dataset hazard_type : str The type identifier of the hazard. Will be stored directly in the hazard object. intensity_unit : str The physical units of the intensity. intensity : str, optional Identifier of the ``xarray.DataArray`` containing the hazard intensity data. coordinate_vars : dict(str, str), optional Mapping from default coordinate names to coordinate names used in the data to read. The default is ``{"event": "time", "longitude": "longitude", "latitude": "latitude"}``, as most of the commonly used hazard data happens to have a "time" attribute but no "event" attribute. data_vars : dict(str, str), optional Mapping from default variable names to variable names used in the data to read. The default names are ``fraction``, ``hazard_type``, ``frequency``, ``event_name``, ``event_id``, and ``date``. If these values are not set, the method tries to load data from the default names. If this fails, the method uses default values for each entry. If the values are set to empty strings (``""``), no data is loaded and the default values are used exclusively. See examples for details. Default values are: * ``date``: The ``event`` coordinate interpreted as date or ordinal, or ones if that fails (which will issue a warning). * ``fraction``: ``None``, which results in a value of 1.0 everywhere, see :py:meth:`~climada.hazard.base.Hazard.__init__` for details. * ``hazard_type``: Empty string * ``frequency``: 1.0 for every event * ``event_name``: String representation of the event date or empty strings if that fails (which will issue a warning). * ``event_id``: Consecutive integers starting at 1 and increasing with time crs : str, optional Identifier for the coordinate reference system of the coordinates. Defaults to ``"EPSG:4326"`` (WGS 84), defined by :py:const:`climada.util.coordinates.DEF_CRS`. See https://pyproj4.github.io/pyproj/dev/api/crs/crs.html#pyproj.crs.CRS.from_user_input for further information on how to specify the coordinate system. rechunk : bool, optional Rechunk the dataset before flattening. This might have serious performance implications. Rechunking in general is expensive, but it might be less expensive than stacking a poorly-chunked array. One event being stored in one chunk would be the optimal configuration. If ``rechunk=True``, this will be forced by rechunking the data. Ideally, you would select the chunks in that manner when opening the dataset before passing it to this function. Defaults to ``False``. open_dataset_kws : dict(str, any) Keyword arguments passed to ``xarray.open_dataset`` if ``data`` is a file path. Ignored otherwise. Returns ------- Hazard A hazard object created from the input data See Also -------- :py:class:`climada.hazard.xarray.HazardXarrayReader` The helper class used to read the data. Notes ----- * Single-valued coordinates given by ``coordinate_vars``, that are not proper dimensions of the data, are promoted to dimensions automatically. If one of the three coordinates does not exist, use ``Dataset.expand_dims`` (see https://docs.xarray.dev/en/stable/generated/xarray.Dataset.expand_dims.html and Examples) before loading the Dataset as Hazard. * Single-valued data for variables ``frequency``. ``event_name``, and ``event_date`` will be broadcast to every event. * The ``event`` coordinate may take arbitrary values. In case these values cannot be interpreted as dates or date ordinals, the default values for ``Hazard.date`` and ``Hazard.event_name`` are used, see the ``data_vars``` parameter documentation above. * To avoid confusion in the call signature, several parameters are keyword-only arguments. * The attributes :py:attr:`~climada.hazard.base.Hazard.haz_type` and :py:attr:`~climada.hazard.base.Hazard.units` currently cannot be read from the Dataset. Use the method parameters to set these attributes. * This method does not read coordinate system metadata. Use the ``crs`` parameter to set a custom coordinate system identifier. Examples -------- The use of this method is straightforward if the Dataset contains the data with expected names. >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["time", "latitude", "longitude"], ... [[[0, 1, 2], [3, 4, 5]]], ... ) ... ), ... dict( ... time=[datetime.datetime(2000, 1, 1)], ... latitude=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> hazard = Hazard.from_xarray_raster(dset, "", "") Data can also be read from a file. >>> dset.to_netcdf("path/to/file.nc") >>> hazard = Hazard.from_xarray_raster("path/to/file.nc", "", "") If you have specific requirements for opening a data file, you can pass ``open_dataset_kws``. >>> hazard = Hazard.from_xarray_raster( ... dset, ... "", ... "", ... open_dataset_kws={"chunks": {"x": -1, "y": "auto"}, "engine": "netcdf4"} ... ) For non-default coordinate names, use the ``coordinate_vars`` argument. >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["day", "lat", "longitude"], ... [[[0, 1, 2], [3, 4, 5]]], ... ) ... ), ... dict( ... day=[datetime.datetime(2000, 1, 1)], ... lat=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> hazard = Hazard.from_xarray_raster( ... dset, "", "", coordinate_vars=dict(event="day", latitude="lat") ... ) Coordinates can be different from the actual dataset dimensions. The following loads the data with coordinates ``longitude`` and ``latitude`` (default names): >>> dset = xr.Dataset( ... dict(intensity=(["time", "y", "x"], [[[0, 1, 2], [3, 4, 5]]])), ... dict( ... time=[datetime.datetime(2000, 1, 1)], ... y=[0, 1], ... x=[0, 1, 2], ... longitude=(["y", "x"], [[0.0, 0.1, 0.2], [0.0, 0.1, 0.2]]), ... latitude=(["y", "x"], [[0.0, 0.0, 0.0], [0.1, 0.1, 0.1]]), ... ), ... ) >>> hazard = Hazard.from_xarray_raster(dset, "", "") Optional data is read from the dataset if the default keys are found. Users can specify custom variables in the data, or that the default keys should be ignored, with the ``data_vars`` argument. >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["time", "latitude", "longitude"], ... [[[0, 1, 2], [3, 4, 5]]], ... ), ... fraction=( ... ["time", "latitude", "longitude"], ... [[[0.0, 0.1, 0.2], [0.3, 0.4, 0.5]]], ... ), ... freq=(["time"], [0.4]), ... event_id=(["time"], [4]), ... ), ... dict( ... time=[datetime.datetime(2000, 1, 1)], ... latitude=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> hazard = Hazard.from_xarray_raster( ... dset, ... "", ... "", ... data_vars=dict( ... # Load frequency from 'freq' array ... frequency="freq", ... # Ignore 'event_id' array and use default instead ... event_id="", ... # 'fraction' array is loaded because it has the default name ... ), ... ) >>> np.array_equal(hazard.frequency, [0.4]) and np.array_equal( ... hazard.event_id, [1] ... ) True If your read single-event data your dataset probably will not have a time dimension. As long as a time *coordinate* exists, however, this method will automatically promote it to a dataset dimension and load the data: >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["latitude", "longitude"], ... [[0, 1, 2], [3, 4, 5]], ... ) ... ), ... dict( ... time=[datetime.datetime(2000, 1, 1)], ... latitude=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> hazard = Hazard.from_xarray_raster(dset, "", "") # Same as first example If one coordinate is missing altogehter, you must add it or expand the dimensions before loading the dataset: >>> dset = xr.Dataset( ... dict( ... intensity=( ... ["latitude", "longitude"], ... [[0, 1, 2], [3, 4, 5]], ... ) ... ), ... dict( ... latitude=[0, 1], ... longitude=[0, 1, 2], ... ), ... ) >>> dset = dset.expand_dims(time=[numpy.datetime64("2000-01-01")]) >>> hazard = Hazard.from_xarray_raster(dset, "", "") """ reader_kwargs = { "intensity": intensity, "coordinate_vars": coordinate_vars, "data_vars": data_vars, "crs": crs, "rechunk": rechunk, } if isinstance(data, xr.Dataset): reader = HazardXarrayReader(data=data, **reader_kwargs) elif isinstance(data, xr.DataArray): raise TypeError( "This method only supports passing xr.Dataset. Consider promoting " "your xr.DataArray to a Dataset." ) else: # data is pathlike reader = HazardXarrayReader.from_file( filename=data, open_dataset_kws=open_dataset_kws, **reader_kwargs ) kwargs = reader.get_hazard_kwargs() | { "haz_type": hazard_type, "units": intensity_unit, } return cls(**cls._check_and_cast_attrs(kwargs))
@staticmethod def _check_and_cast_attrs(attrs: Dict[str, Any]) -> Dict[str, Any]: """Check the validity of the hazard attributes given and cast to correct type if required and possible. The current purpose is to check that event_name is a list of string (and convert to string otherwise), although other checks and casting could be included here in the future. Parameters ---------- attrs : dict Attributes for a new Hazard object Returns ------- attrs : dict Attributes checked for type validity and casted otherwise (only event_name at the moment). Warns ----- UserWarning Warns the user if any value casting happens. """ def _check_and_cast_container( attr_value: Any, expected_container: Collection ) -> Any: """Check if the attribute is of the expected container type and cast if necessary. Parameters ---------- attr_value : any The current value of the attribute. expected_container : type The expected type of the container (e.g., list, np.ndarray). Returns ------- attr_value : any The value cast to the expected container type, if needed. """ if not isinstance(attr_value, expected_container): warnings.warn( f"Value should be of type {expected_container}. Casting it.", UserWarning, ) # Attempt to cast to the expected container type if expected_container is list: return list(attr_value) elif expected_container is np.ndarray: return np.array(attr_value) else: raise TypeError(f"Unsupported container type: {expected_container}") return attr_value def _check_and_cast_elements( attr_value: Any, expected_dtype: Union[Any, None] ) -> Any: """Check if the elements of the container are of the expected dtype and cast if necessary, while preserving the original container type. Parameters ---------- attr_value : any The current value of the attribute (a container). expected_dtype : type or None The expected type of the elements within the container. If None, no casting is done. Returns ------- attr_value : any The value with elements cast to the expected type, preserving the original container type. """ if expected_dtype is None: # No dtype enforcement required return attr_value container_type = type(attr_value) # Preserve the original container type # Perform type checking and casting of elements if isinstance(attr_value, (list, np.ndarray)): if not all(isinstance(val, expected_dtype) for val in attr_value): warnings.warn( f"Not all values are of type {expected_dtype}. Casting values.", UserWarning, ) casted_values = [expected_dtype(val) for val in attr_value] # Return the casted values in the same container type if container_type is list: return casted_values elif container_type is np.ndarray: return np.array(casted_values) else: raise TypeError(f"Unsupported container type: {container_type}") else: raise TypeError( f"Expected a container (e.g., list or ndarray), got {type(attr_value)} instead." ) return attr_value ## This should probably be defined as a CONSTANT? attrs_to_check = {"event_name": (list, str), "event_id": (np.ndarray, None)} for attr_name, (expected_container, expected_dtype) in attrs_to_check.items(): attr_value = attrs.get(attr_name) if attr_value is not None: # Check and cast the container type attr_value = _check_and_cast_container(attr_value, expected_container) # Check and cast the element types (if applicable) attr_value = _check_and_cast_elements(attr_value, expected_dtype) # Update the attrs dictionary with the modified value attrs[attr_name] = attr_value return attrs @staticmethod def _attrs_to_kwargs(attrs: Dict[str, Any], num_events: int) -> Dict[str, Any]: """Transform attributes to init kwargs or use default values If attributes are missing from ``attrs``, this method will use a sensible default value. Parameters ---------- attrs : dict Attributes for a new Hazard object num_events : int Number of events stored in a new Hazard object. Used for determining default values if Hazard object attributes are missing from ``attrs``. Returns ------- kwargs : dict Keywords arguments to be passed to a Hazard constructor """ kwargs = dict() if "event_id" in attrs: kwargs["event_id"] = attrs["event_id"] else: kwargs["event_id"] = np.arange(1, num_events + 1) if "frequency" in attrs: kwargs["frequency"] = attrs["frequency"] else: kwargs["frequency"] = np.ones(kwargs["event_id"].size) if "frequency_unit" in attrs: kwargs["frequency_unit"] = attrs["frequency_unit"] if "event_name" in attrs: kwargs["event_name"] = attrs["event_name"] else: kwargs["event_name"] = list(map(str, kwargs["event_id"])) if "date" in attrs: kwargs["date"] = np.array([attrs["date"]]) else: kwargs["date"] = np.ones(kwargs["event_id"].size) if "orig" in attrs: kwargs["orig"] = np.array([attrs["orig"]]) else: kwargs["orig"] = np.ones(kwargs["event_id"].size, bool) if "unit" in attrs: kwargs["units"] = attrs["unit"] return kwargs
[docs] def read_excel(self, *args, **kwargs): """This function is deprecated, use Hazard.from_excel.""" LOGGER.warning( "The use of Hazard.read_excel is deprecated." "Use Hazard.from_excel instead." ) self.__dict__ = self.__class__.from_excel(*args, **kwargs).__dict__
[docs] @classmethod def from_excel(cls, file_name, var_names=None, haz_type=None): """Read climada hazard generated with the MATLAB code in Excel format. Parameters ---------- file_name : str absolute file name var_names (dict, default): name of the variables in the file, default: DEF_VAR_EXCEL constant haz_type : str, optional acronym of the hazard type (e.g. 'TC'). Default: None, which will use the class default ('' for vanilla `Hazard` objects, and hard coded in some subclasses) Returns ------- haz : climada.hazard.Hazard Hazard object from the provided Excel file Raises ------ KeyError """ # pylint: disable=protected-access if not var_names: var_names = DEF_VAR_EXCEL LOGGER.info("Reading %s", file_name) hazard_kwargs = {} if haz_type is not None: hazard_kwargs["haz_type"] = haz_type try: centroids = Centroids._legacy_from_excel( file_name, var_names=var_names["col_centroids"] ) attrs = cls._read_att_excel(file_name, var_names, centroids) attrs = cls._check_and_cast_attrs(attrs) hazard_kwargs.update(attrs) except KeyError as var_err: raise KeyError("Variable not in Excel file: " + str(var_err)) from var_err return cls(centroids=centroids, **hazard_kwargs)
[docs] def write_raster(self, file_name, variable="intensity", output_resolution=None): """Write intensity or fraction as GeoTIFF file. Each band is an event. Output raster is always a regular grid (same resolution for lat/lon). Note that if output_resolution is not None, the data is rasterized to that resolution. This is an expensive operation. For hazards that are already a raster, output_resolution=None saves on this raster which is efficient. If you want to save both fraction and intensity, create two separate files. These can then be read together with the from_raster method. Parameters ---------- file_name: str file name to write in tif format variable: str if 'intensity', write intensity, if 'fraction' write fraction. Default is 'intensity' output_resolution: int If not None, the data is rasterized to this resolution. Default is None (resolution is estimated from the data). See also -------- from_raster: method to read intensity and fraction raster files. """ if variable == "intensity": var_to_write = self.intensity elif variable == "fraction": var_to_write = self.fraction else: raise ValueError( f"The variable {variable} is not valid. Please use 'intensity' or 'fraction'." ) meta = self.centroids.get_meta(resolution=output_resolution) meta.update(driver="GTiff", dtype=rasterio.float32, count=self.size) res = meta["transform"][0] # resolution from lon coordinates if meta["height"] * meta["width"] == self.centroids.size: # centroids already in raster format u_coord.write_raster(file_name, var_to_write.toarray(), meta) else: geometry = self.centroids.get_pixel_shapes(res=res) with rasterio.open(file_name, "w", **meta) as dst: LOGGER.info("Writing %s", file_name) for i_ev in range(self.size): raster = rasterio.features.rasterize( ( (geom, value) for geom, value in zip( geometry, var_to_write[i_ev].toarray().flatten() ) ), out_shape=(meta["height"], meta["width"]), transform=meta["transform"], fill=0, all_touched=True, dtype=meta["dtype"], ) dst.write(raster.astype(meta["dtype"]), i_ev + 1)
[docs] def write_hdf5(self, file_name, todense=False): """Write hazard in hdf5 format. Parameters ---------- file_name: str file name to write, with h5 format todense: bool if True write the sparse matrices as hdf5.dataset by converting them to dense format first. This increases readability of the file for other programs. default: False """ LOGGER.info("Writing %s", file_name) with h5py.File(file_name, "w") as hf_data: str_dt = h5py.special_dtype(vlen=str) for var_name, var_val in self.__dict__.items(): if var_name == "event_name": if not all((isinstance(val, str) for val in var_val)): raise TypeError("'event_name' must be a list of strings") if var_name == "centroids": # Centroids have their own write_hdf5 method, # which is invoked at the end of this method (s.b.) continue elif isinstance(var_val, sparse.csr_matrix): if todense: hf_data.create_dataset(var_name, data=var_val.toarray()) else: hf_csr = hf_data.create_group(var_name) hf_csr.create_dataset("data", data=var_val.data) hf_csr.create_dataset("indices", data=var_val.indices) hf_csr.create_dataset("indptr", data=var_val.indptr) hf_csr.attrs["shape"] = var_val.shape elif isinstance(var_val, str): hf_str = hf_data.create_dataset(var_name, (1,), dtype=str_dt) hf_str[0] = var_val elif ( isinstance(var_val, list) and var_val and isinstance(var_val[0], str) ): hf_str = hf_data.create_dataset( var_name, (len(var_val),), dtype=str_dt ) for i_ev, var_ev in enumerate(var_val): hf_str[i_ev] = var_ev elif var_val is not None and var_name != "pool": try: hf_data.create_dataset(var_name, data=var_val) except TypeError: LOGGER.warning( "write_hdf5: the class member %s is skipped, due to its " "type, %s, for which writing to hdf5 " "is not implemented. Reading this H5 file will probably lead to " "%s being set to its default value.", var_name, var_val.__class__.__name__, var_name, ) self.centroids.write_hdf5(file_name, mode="a")
[docs] def read_hdf5(self, *args, **kwargs): """This function is deprecated, use Hazard.from_hdf5.""" LOGGER.warning( "The use of Hazard.read_hdf5 is deprecated." "Use Hazard.from_hdf5 instead." ) self.__dict__ = self.__class__.from_hdf5(*args, **kwargs).__dict__
[docs] @classmethod def from_hdf5(cls, file_name): """Read hazard in hdf5 format. Parameters ---------- file_name: str file name to read, with h5 format Returns ------- haz : climada.hazard.Hazard Hazard object from the provided MATLAB file """ LOGGER.info("Reading %s", file_name) # NOTE: This is a stretch. We instantiate one empty object to iterate over its # attributes. But then we create a new one with the attributes filled! haz = cls() hazard_kwargs = dict() with h5py.File(file_name, "r") as hf_data: for var_name, var_val in haz.__dict__.items(): if var_name not in hf_data.keys(): continue if var_name == "centroids": continue if isinstance(var_val, np.ndarray) and var_val.ndim == 1: hazard_kwargs[var_name] = np.array(hf_data.get(var_name)) elif isinstance(var_val, sparse.csr_matrix): hf_csr = hf_data.get(var_name) if isinstance(hf_csr, h5py.Dataset): hazard_kwargs[var_name] = sparse.csr_matrix(hf_csr) else: hazard_kwargs[var_name] = sparse.csr_matrix( ( hf_csr["data"][:], hf_csr["indices"][:], hf_csr["indptr"][:], ), hf_csr.attrs["shape"], ) elif isinstance(var_val, str): hazard_kwargs[var_name] = u_hdf5.to_string(hf_data.get(var_name)[0]) elif isinstance(var_val, list): hazard_kwargs[var_name] = [ x for x in map( u_hdf5.to_string, np.array(hf_data.get(var_name)).tolist() ) ] else: hazard_kwargs[var_name] = hf_data.get(var_name) hazard_kwargs["centroids"] = Centroids.from_hdf5(file_name) hazard_kwargs = cls._check_and_cast_attrs(hazard_kwargs) # Now create the actual object we want to return! return cls(**hazard_kwargs)
@staticmethod def _read_att_mat(data, file_name, var_names, centroids): """Read MATLAB hazard's attributes.""" attrs = dict() attrs["frequency"] = np.squeeze(data[var_names["var_name"]["freq"]]) try: attrs["frequency_unit"] = u_hdf5.get_string( data[var_names["var_name"]["freq_unit"]] ) except KeyError: pass attrs["orig"] = np.squeeze(data[var_names["var_name"]["orig"]]).astype(bool) attrs["event_id"] = np.squeeze( data[var_names["var_name"]["even_id"]].astype(int, copy=False) ) try: attrs["units"] = u_hdf5.get_string(data[var_names["var_name"]["unit"]]) except KeyError: pass n_cen = centroids.size n_event = len(attrs["event_id"]) try: attrs["intensity"] = u_hdf5.get_sparse_csr_mat( data[var_names["var_name"]["inten"]], (n_event, n_cen) ) except ValueError as err: raise ValueError("Size missmatch in intensity matrix.") from err try: attrs["fraction"] = u_hdf5.get_sparse_csr_mat( data[var_names["var_name"]["frac"]], (n_event, n_cen) ) except ValueError as err: raise ValueError("Size missmatch in fraction matrix.") from err except KeyError: attrs["fraction"] = sparse.csr_matrix( np.ones(attrs["intensity"].shape, dtype=float) ) # Event names: set as event_id if no provided try: attrs["event_name"] = u_hdf5.get_list_str_from_ref( file_name, data[var_names["var_name"]["ev_name"]] ) except KeyError: attrs["event_name"] = list(attrs["event_id"]) try: datenum = data[var_names["var_name"]["datenum"]].squeeze() attrs["date"] = np.array( [ ( dt.datetime.fromordinal(int(date)) + dt.timedelta(days=date % 1) - dt.timedelta(days=366) ).toordinal() for date in datenum ] ) except KeyError: pass return attrs @staticmethod def _read_att_excel(file_name, var_names, centroids): """Read Excel hazard's attributes.""" dfr = pd.read_excel(file_name, var_names["sheet_name"]["freq"]) num_events = dfr.shape[0] attrs = dict() attrs["frequency"] = dfr[var_names["col_name"]["freq"]].values attrs["orig"] = dfr[var_names["col_name"]["orig"]].values.astype(bool) attrs["event_id"] = dfr[var_names["col_name"]["even_id"]].values.astype( int, copy=False ) attrs["date"] = dfr[var_names["col_name"]["even_dt"]].values.astype( int, copy=False ) attrs["event_name"] = dfr[var_names["col_name"]["even_name"]].values.tolist() dfr = pd.read_excel(file_name, var_names["sheet_name"]["inten"]) # number of events (ignore centroid_ID column) # check the number of events is the same as the one in the frequency if dfr.shape[1] - 1 is not num_events: raise ValueError( "Hazard intensity is given for a number of events " "different from the number of defined in its frequency: " f"{dfr.shape[1] - 1} != {num_events}" ) # check number of centroids is the same as retrieved before if dfr.shape[0] is not centroids.size: raise ValueError( "Hazard intensity is given for a number of centroids " "different from the number of centroids defined: " f"{dfr.shape[0]} != {centroids.size}" ) attrs["intensity"] = sparse.csr_matrix( dfr.values[:, 1 : num_events + 1].transpose() ) attrs["fraction"] = sparse.csr_matrix( np.ones(attrs["intensity"].shape, dtype=float) ) return attrs
def _values_from_raster_files( file_names, meta, band=None, src_crs=None, window=None, geometry=None, dst_crs=None, transform=None, width=None, height=None, resampling=rasterio.warp.Resampling.nearest, ): """Read raster of bands and set 0 values to the masked ones. Each band is an event. Select region using window or geometry. Reproject input by proving dst_crs and/or (transform, width, height). The main purpose of this function is to read intensity/fraction values from raster files for use in Hazard.read_raster. It is implemented as a separate helper function (instead of a class method) to allow for parallel computing. Parameters ---------- file_names : str path of the file meta : dict description of the centroids raster band : list(int), optional band number to read. Default: [1] src_crs : crs, optional source CRS. Provide it if error without it. window : rasterio.windows.Window, optional window to read geometry : list of shapely.geometry, optional consider pixels only within these shapes dst_crs : crs, optional reproject to given crs transform : rasterio.Affine affine transformation to apply wdith : float number of lons for transform height : float number of lats for transform resampling : rasterio.warp,.Resampling optional resampling function used for reprojection to dst_crs Raises ------ ValueError Returns ------- inten : scipy.sparse.csr_matrix Each row is an event. """ if band is None: band = [1] values = [] for file_name in file_names: tmp_meta, data = u_coord.read_raster( file_name, band, src_crs, window, geometry, dst_crs, transform, width, height, resampling, ) if ( tmp_meta["crs"] != meta["crs"] or tmp_meta["transform"] != meta["transform"] or tmp_meta["height"] != meta["height"] or tmp_meta["width"] != meta["width"] ): raise ValueError("Raster data is inconsistent with contained raster.") values.append(sparse.csr_matrix(data)) return sparse.vstack(values, format="csr")