Source code for aggregation

Functions to aggregate region data for a reduced set 
of regions obtained as a result of spatial grouping of regions. 

import logging
import warnings
from copy import deepcopy

import numpy as np
import pandas as pd
import xarray as xr
from shapely.ops import unary_union

logger_representation = logging.getLogger("spatial_representation")

[docs] def aggregate_geometries(xr_data_array_in, sub_to_sup_region_id_dict): """ For each region group, aggregates their geometries to form one super geometry. :param xr_data_array_in: subset of the xarray dataset data that corresponds to geometry variable :type xr_data_array_in: xr.DataArray :param sub_to_sup_region_id_dict: Dictionary new regions' ids and their corresponding group of regions * Ex.: {'01_reg_02_reg': ['01_reg','02_reg'],\n '03_reg_04_reg': ['03_reg','04_reg']} :type sub_to_sup_region_id_dict: Dict[str, List[str]] :returns: xr_data_array_out * Contains new geometries as values * Coordinates correspond to new regions (In the above example, '01_reg_02_reg', '03_reg_04_reg' form new coordinates) :rtype: xr.DataArray """ space = list(sub_to_sup_region_id_dict.keys()) shape_list = [] for sub_region_id_list in sub_to_sup_region_id_dict.values(): temp_shape_list = list(xr_data_array_in.sel(space=sub_region_id_list).values) shape_union = unary_union(temp_shape_list) shape_list.append(shape_union) if len(shape_list) == 1: shape_list = ( pd.Series(shape_list, index=space).to_xarray().rename({"index": "space"}) ) shape_list = np.array(shape_list, dtype=object) xr_data_array_out = xr.DataArray(shape_list, coords=[space], dims=["space"]) return xr_data_array_out
[docs] def aggregate_time_series_spatially( xr_data_array_in, sub_to_sup_region_id_dict, mode="mean", xr_weight_array=None, ): """ For each region group, aggregates the given time series variable. :param xr_data_array_in: subset of the xarray dataset data that corresponds to a time series variable :type xr_data_array_in: xr.DataArray :param sub_to_sup_region_id_dict: Dictionary new regions' ids and their corresponding group of regions * Ex.: {'01_reg_02_reg': ['01_reg','02_reg'],\n '03_reg_04_reg': ['03_reg','04_reg']} :type sub_to_sup_region_id_dict: Dict[str, List[str]] **Default arguments:** :param mode: Specifies how the time series should be aggregated |br| * the default value is 'mean' :type mode: str, one of {"mean", "weighted mean", "sum"} :param xr_weight_array: Required if `mode` is "weighted mean". `xr_weight_array` in this case would provide weights. The dimensions and coordinates of it should be same as `xr_data_array_in` |br| * the default value is None :type xr_weight_array: xr.DataArray :returns: xr_data_array_out * Contains aggregated time series as values * Coordinates correspond to new regions (In the above example, '01_reg_02_reg', '03_reg_04_reg' form new coordinates) :rtype: xr.DataArray """ space_coords = list(sub_to_sup_region_id_dict.keys()) aggregated_coords = { key: value.values for key, value in xr_data_array_in.coords.items() } aggregated_coords["space"] = space_coords coord_list = [value for value in aggregated_coords.values()] dim_list = [key for key in aggregated_coords.keys()] data_out_dummy = np.empty(tuple(len(coord) for coord in aggregated_coords.values())) data_out_dummy[:] = np.nan xr_data_array_out = xr.DataArray(data_out_dummy, coords=coord_list, dims=dim_list) for sup_region_id, sub_region_id_list in sub_to_sup_region_id_dict.items(): sub_region_da = xr_data_array_in.sel(space=sub_region_id_list) # drop regions that contains only NAs. These correspond to locationally ineligible regions sub_region_da = sub_region_da.dropna(dim="space", how="all") if mode == "weighted mean": # xr_data_array_in dytpe is set as float, this to avoid the division by zero error when dytpe=object sub_region_da = sub_region_da.astype(dtype=float) # get weights sub_region_weight_da = xr_weight_array.sel(space=sub_region_id_list) ## drop regions that contains only NAs. These correspond to locationally ineligible regions sub_region_weight_da = sub_region_weight_da.dropna(dim="space", how="all") weighted_sub_region_da = sub_region_da * sub_region_weight_da xr_data_array_out.loc[dict(space=sup_region_id)] = ( weighted_sub_region_da.sum(dim="space") / sub_region_weight_da.sum(dim="space") ) elif mode == "mean": xr_data_array_out.loc[dict(space=sup_region_id)] = sub_region_da.mean( dim="space" ).values elif mode == "sum": xr_data_array_out.loc[dict(space=sup_region_id)] = sub_region_da.sum( dim="space" ).values else: logger_representation.error( 'Please select one of the modes "weighted mean", "mean", or "sum"' ) # NOTE: If theres a 0 in weight (ex: capacity being 0), # it results in na during weighted mean calculation. These are converted to 0s if mode == "weighted mean": xr_data_array_out = xr_data_array_out.fillna(0) return xr_data_array_out
[docs] def aggregate_values_spatially( xr_data_array_in, sub_to_sup_region_id_dict, mode="mean" ): """ For each region group, aggregates the given 1d variable. :param xr_data_array_in: subset of the xarray dataset data that corresponds to a 1d variable :type xr_data_array_in: xr.DataArray :param sub_to_sup_region_id_dict: Dictionary new regions' ids and their corresponding group of regions * Ex.: {'01_reg_02_reg': ['01_reg','02_reg'],\n '03_reg_04_reg': ['03_reg','04_reg']} :type sub_to_sup_region_id_dict: Dict[str, List[str]] **Default arguments:** :param mode: Specifies how the values should be aggregated |br| * the default value is 'mean' :type mode: str, one of {"mean", "sum", "bool"} :returns: xr_data_array_out * Contains aggregated 1d variable as values * Coordinates correspond to new regions (In the above example, '01_reg_02_reg', '03_reg_04_reg' form new coordinates) :rtype: xr.DataArray """ space_coords = list(sub_to_sup_region_id_dict.keys()) aggregated_coords = { key: value.values for key, value in xr_data_array_in.coords.items() } aggregated_coords["space"] = space_coords coord_list = [value for value in aggregated_coords.values()] dim_list = [key for key in aggregated_coords.keys()] data_out_dummy = np.zeros(tuple(len(coord) for coord in aggregated_coords.values())) xr_data_array_out = xr.DataArray(data_out_dummy, coords=coord_list, dims=dim_list) for sup_region_id, sub_region_id_list in sub_to_sup_region_id_dict.items(): sub_region_da = xr_data_array_in.sel(space=sub_region_id_list) if mode == "mean": xr_data_array_out.loc[dict(space=sup_region_id)] = sub_region_da.mean( dim="space" ).values else: _sum_xr = sub_region_da.sum(dim="space") if mode == "sum": xr_data_array_out.loc[dict(space=sup_region_id)] = _sum_xr elif mode == "bool": xr_data_array_out.loc[dict(space=sup_region_id)] = _sum_xr.where( np.logical_or(_sum_xr.isnull(), _sum_xr == 0), 1 ) # only replace positive non nas else: logger_representation.error( 'Please select one of the modes "mean", "bool" or "sum"' ) xr_data_array_out = xr_data_array_out.fillna(0) return xr_data_array_out
[docs] def aggregate_connections(xr_data_array_in, sub_to_sup_region_id_dict, mode="bool"): """ For each region group, aggregates the given 2d variable. :param xr_data_array_in: subset of the xarray dataset that corresponds to a 2d variable :type xr_data_array_in: xr.DataArray :param sub_to_sup_region_id_dict: Dictionary new regions' ids and their corresponding group of regions * Ex.: {'01_reg_02_reg': ['01_reg','02_reg'],\n '03_reg_04_reg': ['03_reg','04_reg']} :type sub_to_sup_region_id_dict: Dict[str, List[str]] **Default arguments:** :param mode: Specifies how the connections should be aggregated |br| * the default value is 'bool' :type mode: str, one of {"bool", "mean", "sum"} :returns: xr_data_array_out * Contains aggregated 2d variable as values * Coordinates correspond to new regions (In the above example, '01_reg_02_reg', '03_reg_04_reg' form new coordinates) :rtype: xr.DataArray """ space_coords = list(sub_to_sup_region_id_dict.keys()) aggregated_coords = { key: value.values for key, value in xr_data_array_in.coords.items() } aggregated_coords["space"] = space_coords aggregated_coords["space_2"] = space_coords coord_list = [value for value in aggregated_coords.values()] dim_list = [key for key in aggregated_coords.keys()] data_out_dummy = np.zeros(tuple(len(coord) for coord in aggregated_coords.values())) xr_data_array_out = xr.DataArray(data_out_dummy, coords=coord_list, dims=dim_list) for sup_region_id, sub_region_id_list in sub_to_sup_region_id_dict.items(): for sup_region_id_2, sub_region_id_list_2 in sub_to_sup_region_id_dict.items(): sub_region_da = xr_data_array_in.sel( space=sub_region_id_list, space_2=sub_region_id_list_2 ) if mode == "mean": xr_data_array_out.loc[ dict(space=sup_region_id, space_2=sup_region_id_2) ] = sub_region_da.mean(dim=["space", "space_2"]).values else: _sum_xr = sub_region_da.sum(dim=["space", "space_2"]) if mode == "sum": xr_data_array_out.loc[ dict(space=sup_region_id, space_2=sup_region_id_2) ] = _sum_xr elif mode == "bool": xr_data_array_out.loc[ dict(space=sup_region_id, space_2=sup_region_id_2) ] = _sum_xr.where( np.logical_or(_sum_xr.isnull(), _sum_xr == 0), 1 ) # only replace positive non nas else: logger_representation.error( 'Please select one of the modes "mean", "bool" or "sum"' ) # set diagonal values to 0 if sup_region_id == sup_region_id_2: xr_data_array_out.loc[ dict(space=sup_region_id, space_2=sup_region_id_2) ] = 0 xr_data_array_out = xr_data_array_out.fillna(0) return xr_data_array_out
[docs] def aggregate_esm_parameters_spatially( param_df_in, old_locations, sub_to_sup_region_id_dict, mode="mean" ): """ For each region group, aggregates the given esm init parameter data. :param param_df_in: the dataframe with parameter data :type param_df_in: pd.DataFrame :param old_locations: list of former unaggregated regions :type old_locations: list :param sub_to_sup_region_id_dict: Dictionary new regions' ids and their corresponding group of regions * Ex.: {'01_reg_02_reg': ['01_reg','02_reg'],\n '03_reg_04_reg': ['03_reg','04_reg']} :type sub_to_sup_region_id_dict: Dict[str, List[str]] **Default arguments:** :param mode: Specifies how the data should be aggregated |br| * the default value is 'mean' :type mode: str, one of {"mean", "sum"} :returns: param_df_out * Contains aggregated data :rtype: pd.DataFrame """ new_col_names = list(sub_to_sup_region_id_dict.keys()) new_col_names.extend([x for x in param_df_in.columns if x not in old_locations]) param_df_out = pd.DataFrame(data=0, index=param_df_in.index, columns=new_col_names) for sup_region_id, sub_region_id_list in sub_to_sup_region_id_dict.items(): if mode == "mean": param_df_out[sup_region_id] = param_df_in[sub_region_id_list].mean(axis=1) if mode == "sum": param_df_out[sup_region_id] = param_df_in[sub_region_id_list].sum(axis=1) return param_df_out
[docs] def aggregate_based_on_sub_to_sup_region_id_dict( xarray_datasets, sub_to_sup_region_id_dict, aggregation_function_dict ): """ After spatial grouping, for each region group, spatially aggregates the data. :param xarray_datasets: The dictionary of xarray datasets holding esM's info :type xarray_datasets: Dict[str, xr.Dataset] :param sub_to_sup_region_id_dict: Dictionary new regions' ids and their corresponding group of regions * Ex.: {'01_reg_02_reg': ['01_reg','02_reg'],\n '03_reg_04_reg': ['03_reg','04_reg']} :type sub_to_sup_region_id_dict: Dict[str, List[str]] :param aggregation_function_dict: Contains information regarding the mode of aggregation for each individual variable, component, and component class combination.\n * Aggregation possibilities: mean, weighted mean, sum, bool(boolean OR). * Format of the dictionary:\n {<component_class>: {<component_name>: {<variable_name>: (<mode_of_aggregation>, <weights>),\n <variable_name>: (<mode_of_aggregation>, None)}}}\n <weights> is required only if <mode_of_aggregation> is 'weighted mean'. The name of the variable that should act as weights should be provided. Can be None otherwise. :type aggregation_function_dict: Dict[str, Tuple(str, None/str)] :returns: aggregated_xr_dataset * New xarray dataset with aggregated information * Coordinates correspond to new regions (In the above example, '01_reg_02_reg', '03_reg_04_reg' form new coordinates) :rtype: xr.Dataset """ # private function to get aggregation mode for a particular variable name def _get_aggregation_mode(varname, comp=None, comp_ds=None): # If aggregation_function_dict is passed AND the current variable is in it... if (aggregation_function_dict is not None) and ( varname in aggregation_function_dict.keys() ): ## Get the mode and weight aggregation_mode = aggregation_function_dict[varname][0] aggregation_weight = aggregation_function_dict[varname][1] ## If the mode is "weighted mean"... if aggregation_mode == "weighted mean": ## raise error if weight is not specified if aggregation_weight is None: raise TypeError( "Weights must be passed in order to perform weighted mean" ) ## get corresponding weight data if another variable is supposed to be the weight elif isinstance(aggregation_weight, str): if varname[:3] == "2d_": try: aggregation_weight = comp_ds[f"2d_{aggregation_weight}"] except Exception: warnings.warn( f"Aggregation mode for {comp} component's {varname[3:]} set to mean instead of \ weighted mean because corresponding weight: {aggregation_weight} variable is not found" ) aggregation_mode = "mean" else: try: aggregation_weight = comp_ds[f"1d_{aggregation_weight}"] except Exception: warnings.warn( f"Aggregation mode for {comp} component's {varname[3:]} set to mean instead of \ weighted mean because corresponding weight: {aggregation_weight} variable is not found" ) aggregation_mode = "mean" else: raise TypeError( "Aggregation mode for {comp} component's {varname[3:]} is weighted mean, but the \ corresponding weight provided is not valid." ) # If aggregation_function_dict is not passed OR the current variable is not in it, set default else: aggregation_mode = "mean" aggregation_weight = None return aggregation_mode, aggregation_weight # Make a copy of xarray_dataset aggregated_xr_dataset = deepcopy(xarray_datasets) # update esM Parameters parameters_dict = aggregated_xr_dataset.get("Parameters").attrs for varname, vardata in parameters_dict.items(): if varname == "locations": parameters_dict[varname] = set(sub_to_sup_region_id_dict.keys()) elif isinstance(vardata, pd.DataFrame): old_locations = xarray_datasets.get("Parameters").attrs["locations"] if all([x in vardata.columns for x in old_locations]): aggregation_mode, aggregation_weight = _get_aggregation_mode(varname) aggregated_vardata = aggregate_esm_parameters_spatially( vardata, old_locations, sub_to_sup_region_id_dict, mode=aggregation_mode, ) parameters_dict[varname] = aggregated_vardata # Aggregate geometries aggregated_xr_dataset["Geometry"] = aggregate_geometries( xarray_datasets.get("Geometry")["geometries"], sub_to_sup_region_id_dict ) # Aggregate input data for comp_class, comp_dict in xarray_datasets.get("Input").items(): for comp, comp_ds in comp_dict.items(): aggregated_comp_ds = xr.Dataset() for varname, da in comp_ds.data_vars.items(): # Check and set aggregation mode and weights aggregation_mode, aggregation_weight = _get_aggregation_mode( varname, comp, comp_ds ) # only aggregate data corresponding to regions that are locationally eligible var_dim = varname[:3] if var_dim != "0d_": if var_dim == "2d_": locational_eligibility = comp_ds["2d_locationalEligibility"] else: locational_eligibility = comp_ds["1d_locationalEligibility"] da = da.where(locational_eligibility != 0) if aggregation_weight is not None: aggregation_weight = aggregation_weight.where( locational_eligibility != 0 ) # check if multiple investment periods exist if "Period" in da.coords: if not da.coords["Period"].values == np.array(0): raise NotImplementedError( "Spatial aggregation currently does not support multiple investment periods." ) else: ## drop the period coordinate da = da.reset_coords("Period", drop=True) ## Time series if var_dim == "ts_": da = aggregate_time_series_spatially( da, sub_to_sup_region_id_dict, mode=aggregation_mode, xr_weight_array=aggregation_weight, ) ## 1d variables elif var_dim == "1d_": da = aggregate_values_spatially( da, sub_to_sup_region_id_dict, mode=aggregation_mode, ) ## 2d variables elif var_dim == "2d_": da = aggregate_connections( da, sub_to_sup_region_id_dict, mode=aggregation_mode, ) aggregated_comp_ds[varname] = da aggregated_xr_dataset["Input"][comp_class][comp] = aggregated_comp_ds return aggregated_xr_dataset