Source code for techAggregation

"""
Aggregation of RE technologies in every region.
"""

# ruff: noqa

import logging
import numpy as np
import xarray as xr
from sklearn.cluster import AgglomerativeClustering
from fine.IOManagement.standardIO import timer
from fine.aggregations.technologyAggregation import techAggregationUtils

logger_tech_agg = logging.getLogger("technology_representation")


[docs] @timer def aggregate_RE_technology( gridded_RE_ds=None, CRS_attr=None, shp_file=None, non_gridded_RE_ds=None, n_timeSeries_perRegion=1, capacity_var_name="capacity", capfac_var_name="capacity factor", region_var_name="region", longitude_dim_name="x", latitude_dim_name="y", time_dim_name="time", location_dim_name="locations", shp_index_col="region_ids", shp_geometry_col="geometry", linkage="average", ): """ Reduces the number of a particular RE technology (e.g. onshore wind turbine) to a desired number, within each region. .. note:: The explanation below uses wind turbines as an example. It could, in reality, be any variable RE technology like PV, offshore wind turbine, etc. The number of simulated wind turbines could be huge. This function reduces them to a few turbine types, in each of the defined region. Each wind turbine is characterised by its capacity and capacity factor time series. The basic idea here is to group the turbines, within each region, such that the turbines with most similar capacity factor time series appear in the same group. Next, the turbines in each group are aggregated to obtain one turbine type, per group, thereby reducing the total number of turbines. Please go through the parameters list below for more information. **Default arguments:** :param gridded_RE_ds: Either the path to the dataset or the read-in xr.Dataset * Dimensions in this data - `latitude_dim_name`, `longitude_dim_name`, and `time_dim_name` * Variables: `capacity_var_name` and `capfac_var_name` :type gridded_RE_ds: str/xr.Dataset :param CRS_attr: The attribute in `gridded_RE_ds` that holds its Coordinate Reference System (CRS) information :type CRS_attr: str :param shp_file: Either the path to the shapefile or the read-in shapefile that should be overlapped with `gridded_RE_ds`, in order to obtain regions' information :type shp_file: str/GeoDataFrame :param non_gridded_RE_ds: Either the path to the dataset or the read-in xr.Dataset * Dimensions in this data - `location_dim_name` and `time_dim_name` * Variables - `capacity_var_name`, `capfac_var_name`, and `region_var_name` One can either pass `gridded_RE_ds` or `non_gridded_RE_ds` to work with. If both are passed, `gridded_RE_ds` is considered :type non_gridded_RE_ds: str/xr.Dataset :param n_timeSeries_perRegion: The number of time series to which the original set should be aggregated, within each region. * If set to 1, performs simple aggregation - Within every region, calculates the weighted mean of RE time series (capacities being weights), and sums the capacities. * If set to a value greater than 1, time series clustering is employed - Clustering method: Sklearn's agglomerative hierarchical clustering - Distance measure: Euclidean distance - Aggregation within each resulting cluster is the same as simple aggregation |br| * the default value is 1 :type n_timeSeries_perRegion: strictly positive int :param capacity_var_name: The name of the data variable in the provided dataset that corresponds to capacity |br| * the default value is 'capacity' :type capacity_var_name: str :param capfac_var_name: The name of the data variable in the provided dataset that corresponds to capacity factor time series |br| * the default value is 'capacity factor' :type capfac_var_name: str :param region_var_name: The name of the data variable in `non_gridded_RE_ds` that contains region IDs |br| * the default value is 'region' :type region_var_name: str :param longitude_dim_name: The dimension name in `gridded_RE_ds` that corresponds to longitude |br| * the default value is 'x' :type longitude_dim_name: str :param latitude_dim_name: The dimension name in `gridded_RE_ds` that corresponds to latitude |br| * the default value is 'y' :type latitude_dim_name: str :param time_dim_name: The dimension name in in the provided dataset that corresponds to time |br| * the default value is 'time' :type time_dim_name: str :param location_dim_name: The dimension name in `non_gridded_RE_ds` that corresponds to locations |br| * the default value is 'locations' :type location_dim_name: str :param shp_index_col: The column in `shp_file` that needs to be taken as location-index in `gridded_RE_ds` |br| * the default value is 'region_ids' :type shp_index_col: str :param shp_geometry_col: The column in `shp_file` that holds geometries |br| * the default value is 'geometry' :type shp_geometry_col: str :param linkage: * Relevant only if `n_timeSeries_perRegion` is greater than 1. * The linkage criterion to be used with agglomerative hierarchical clustering. Can be 'complete', 'single', etc. Refer to Sklearn's documentation for more info. |br| * the default value is 'average' :type linkage: str :returns: regional_aggregated_RE_ds * Dimensions in this data: `time_dim_name`, 'region_ids' * The dimension 'region_ids' has its coordinates corresponding to `shp_index_col` if `gridded_RE_ds` is passed. Otherwise, it corresponds to `region_var_name` values If `n_timeSeries_perRegion` is greater than 1, additional dimension - 'TS_ids' is present * Within each region, different time series are indicated by this 'TS_ids' * In addition, the dataset also contains attributes which indicate which time series were clustered. Calling represented_RE_ds.attrs would render a dictionary that contains <region_ids>.<TS_ids> as keys and a list of original locations as values. In case of `gridded_RE_ds`, these locations are a tuple - (x/longitude, y/latitude) :rtype: xr.Dataset """ def _preprocess_regional_xr_ds_gridded(region): """ Private function to preprocess regional gridded data """ # Get regional data regional_ds = rasterized_RE_ds.sel(region_ids=region) regional_capfac_da = regional_ds[capfac_var_name].where( regional_ds.rasters == 1 ) regional_capacity_da = regional_ds[capacity_var_name].where( regional_ds.rasters == 1 ) # Restructure data regional_capfac_da = regional_capfac_da.stack( x_y=[longitude_dim_name, latitude_dim_name] ) regional_capfac_da = regional_capfac_da.transpose("x_y", time_dim_name) regional_capacity_da = regional_capacity_da.stack( x_y=[longitude_dim_name, latitude_dim_name] ) # Remove all time series with 0 values regional_capfac_da = regional_capfac_da.where(regional_capacity_da > 0) regional_capacity_da = regional_capacity_da.where(regional_capacity_da > 0) # Drop NAs regional_capfac_da = regional_capfac_da.dropna(dim="x_y") regional_capacity_da = regional_capacity_da.dropna(dim="x_y") # Print out number of time series in the region n_ts = len(regional_capfac_da["x_y"].values) logger_tech_agg.info(f"Number of time series in {region}: {n_ts}") # Get power curves from capacity factor time series and capacities regional_power_da = regional_capacity_da * regional_capfac_da return regional_capfac_da, regional_capacity_da, regional_power_da def _preprocess_regional_xr_ds_non_gridded(region): """ Private function to preprocess regional non gridded data """ # Get regional data regional_ds = non_gridded_RE_ds.where( non_gridded_RE_ds[region_var_name] == region ) # Rename locations dim to keep results same as _preprocess_regional_xr_ds_gridded() regional_ds = regional_ds.rename({location_dim_name: "x_y"}) regional_capfac_da = regional_ds[capfac_var_name] ## make sure the dimensions are in the right order regional_capfac_da = regional_capfac_da.transpose("x_y", time_dim_name) regional_capacity_da = regional_ds[capacity_var_name] # Remove all time series with 0 values regional_capfac_da = regional_capfac_da.where(regional_capacity_da > 0) regional_capacity_da = regional_capacity_da.where(regional_capacity_da > 0) # Drop NAs regional_capfac_da = regional_capfac_da.dropna(dim="x_y") regional_capacity_da = regional_capacity_da.dropna(dim="x_y") # Print out number of time series in the region n_ts = len(regional_capfac_da["x_y"].values) logger_tech_agg.info(f"Number of time series in {region}: {n_ts}") # Get power curves from capacity factor time series and capacities regional_power_da = regional_capacity_da * regional_capfac_da return regional_capfac_da, regional_capacity_da, regional_power_da # STEP 0. Do all checks ## either provide gridded_RE_ds or non_gridded_RE_ds if gridded_RE_ds is None and non_gridded_RE_ds is None: raise ValueError("Either gridded_RE_ds or non_gridded_RE_ds must be passed") if gridded_RE_ds is not None and non_gridded_RE_ds is not None: logger_tech_agg.warn( "Both gridded_RE_ds and non_gridded_RE_ds are passed. \ gridded_RE_ds will be used" ) non_gridded_RE_ds = None ## gridded_RE_ds requires CRS_attr and shp_file to be set if gridded_RE_ds is not None: if CRS_attr is None: raise ValueError( "You have passed gridded_RE_ds. It requires setting CRS_attr" ) elif shp_file is None: raise ValueError( "You have passed gridded_RE_ds. It requires a shapefile (shp_file) too" ) # STEP 1. Rasterize the gridded dataset if gridded_RE_ds is not None: rasterized_RE_ds = techAggregationUtils.rasterize_xr_ds( gridded_RE_ds, CRS_attr, shp_file, shp_index_col, shp_geometry_col, longitude_dim_name, latitude_dim_name, ) region_ids = rasterized_RE_ds["region_ids"].values n_regions = len(region_ids) time_steps = rasterized_RE_ds[time_dim_name].values n_timeSteps = len(time_steps) else: # Read in the file if isinstance(non_gridded_RE_ds, str): try: non_gridded_RE_ds = xr.open_dataset(non_gridded_RE_ds) except: raise FileNotFoundError("The gridded_RE_ds path specified is not valid") elif not isinstance(non_gridded_RE_ds, xr.Dataset): raise TypeError( "gridded_RE_ds must either be a path to a netcdf file or xarray dataset" ) region_ids = np.unique(non_gridded_RE_ds[region_var_name].values) n_regions = len(region_ids) time_steps = non_gridded_RE_ds[time_dim_name].values n_timeSteps = len(time_steps) if n_timeSeries_perRegion == 1: # STEP 2. Create resultant xarray dataset ## time series data = np.zeros((n_timeSteps, n_regions)) represented_timeSeries = xr.DataArray( data, [(time_dim_name, time_steps), ("region_ids", region_ids)] ) # capacities data = np.zeros(n_regions) represented_capacities = xr.DataArray(data, [("region_ids", region_ids)]) # STEP 3. Aggregation in every region... for region in region_ids: # Preprocess regional data if gridded_RE_ds != None: ( regional_capfac_da, regional_capacity_da, regional_power_da, ) = _preprocess_regional_xr_ds_gridded(region) else: ( regional_capfac_da, regional_capacity_da, regional_power_da, ) = _preprocess_regional_xr_ds_non_gridded(region) # Aggregation ## capacity capacity_total = regional_capacity_da.sum(dim="x_y").values represented_capacities.loc[region] = capacity_total ## capacity factor power_total = regional_power_da.sum(dim="x_y").values capfac_total = power_total / capacity_total represented_timeSeries.loc[:, region] = capfac_total # STEP 4. Create resulting dataset regional_represented_RE_ds = xr.Dataset( { capacity_var_name: represented_capacities, capfac_var_name: represented_timeSeries, } ) return regional_represented_RE_ds else: # STEP 2. Create resultant xarray dataset TS_ids = [f"TS_{i}" for i in range(n_timeSeries_perRegion)] ## time series data = np.zeros((n_timeSteps, n_regions, n_timeSeries_perRegion)) aggregated_timeSeries = xr.DataArray( data, [ (time_dim_name, time_steps), ("region_ids", region_ids), ("TS_ids", TS_ids), ], ) ## capacities data = np.zeros((n_regions, n_timeSeries_perRegion)) aggregated_capacities = xr.DataArray( data, [("region_ids", region_ids), ("TS_ids", TS_ids)] ) # ## ts groups cluster_labels = {} # STEP 3. Clustering in every region... for region in region_ids: # Preprocess regional data if gridded_RE_ds != None: ( regional_capfac_da, regional_capacity_da, regional_power_da, ) = _preprocess_regional_xr_ds_gridded(region) else: ( regional_capfac_da, regional_capacity_da, regional_power_da, ) = _preprocess_regional_xr_ds_non_gridded(region) # Clustering agg_cluster = AgglomerativeClustering( n_clusters=n_timeSeries_perRegion, metric="euclidean", linkage=linkage ) agglomerative_model = agg_cluster.fit(regional_capfac_da) # Aggregation for i in range(np.unique(agglomerative_model.labels_).shape[0]): ## Aggregate capacities cluster_capacity = regional_capacity_da[ agglomerative_model.labels_ == i ] cluster_capacity_total = cluster_capacity.sum(dim="x_y").values aggregated_capacities.loc[region, TS_ids[i]] = cluster_capacity_total # aggregate capacity factor cluster_power = regional_power_da[agglomerative_model.labels_ == i] cluster_power_total = cluster_power.sum(dim="x_y").values cluster_capfac_total = cluster_power_total / cluster_capacity_total aggregated_timeSeries.loc[:, region, TS_ids[i]] = cluster_capfac_total ## lables cluster_labels[f"{region}.{TS_ids[i]}"] = list( regional_capacity_da["x_y"].values[agglomerative_model.labels_ == i] ) # STEP 4. Create resulting dataset regional_aggregated_RE_ds = xr.Dataset( { capacity_var_name: aggregated_capacities, capfac_var_name: aggregated_timeSeries, } ) regional_aggregated_RE_ds.attrs = cluster_labels return regional_aggregated_RE_ds