import time
from pathlib import Path
import pandas as pd
import xarray as xr
from netCDF4 import Dataset
import fine.utils as utils
from fine.IOManagement import dictIO, utilsIO
[docs]
def convertOptimizationOutputToDatasets(esM, optSumOutputLevel=0):
"""
Takes esM instance output and converts it into an xarray dataset.
:param esM: EnergySystemModel instance in which the optimized model is held
:type esM: EnergySystemModel instance
:param optSumOutputLevel: Output level of the optimization summary (see
EnergySystemModel). Either an integer (0,1,2) which holds for all model
classes or a dictionary with model class names as keys and an integer
(0,1,2) for each key (e.g. {'StorageModel':1,'SourceSinkModel':1,...}
|br| * the default value is 2
:type optSumOutputLevel: int (0,1,2) or dict
:return: xr_ds - EnergySystemModel instance output data in xarray dataset format
:rtype: xarray.dataset
"""
# Create the netCDF file and the xr.Dataset dict for all ips and components
xr_dss = dict.fromkeys(esM.investmentPeriodNames)
for ip in esM.investmentPeriodNames:
xr_dss[ip] = dict.fromkeys(esM.componentModelingDict.keys())
for model_dict in esM.componentModelingDict.keys():
xr_dss[ip][model_dict] = {
key: xr.Dataset()
for key in esM.componentModelingDict[model_dict].componentsDict.keys()
}
for ip in esM.investmentPeriodNames:
# Write output from esM.getOptimizationSummary to datasets
for name in esM.componentModelingDict.keys():
utils.output("\tProcessing " + name + " ...", esM.verbose, 0)
oL = optSumOutputLevel
oL_ = oL[name] if isinstance(oL, dict) else oL
optSum = esM.getOptimizationSummary(name, ip=ip, outputLevel=oL_)
if esM.componentModelingDict[name].dimension == "1dim":
for component in optSum.index.get_level_values(0).unique():
variables = optSum.loc[component].index.get_level_values(0)
units = optSum.loc[component].index.get_level_values(1)
variables_unit = dict(zip(variables, units))
for variable in (
optSum.loc[component].index.get_level_values(0).unique()
):
df = optSum.loc[(component, variable)]
df = df.iloc[-1]
df.name = variable
df.index.rename("space", inplace=True)
df = pd.to_numeric(df)
xr_da = df.to_xarray()
# add variable [e.g. 'TAC'] and units to attributes of xarray
unit = variables_unit[variable]
xr_da.attrs[variable] = unit
xr_dss[ip][name][component] = xr.merge(
[xr_dss[ip][name][component], xr_da],
combine_attrs="drop_conflicts",
)
elif esM.componentModelingDict[name].dimension == "2dim":
for component in optSum.index.get_level_values(0).unique():
variables = optSum.loc[component].index.get_level_values(0)
units = optSum.loc[component].index.get_level_values(1)
variables_unit = dict(zip(variables, units))
for variable in (
optSum.loc[component].index.get_level_values(0).unique()
):
df = optSum.loc[(component, variable)]
if len(df.index.get_level_values(0).unique()) > 1:
idx = df.index.get_level_values(0).unique()[-1]
df = df.xs(idx, level=0)
else:
df.index = df.index.droplevel(0)
# df = df.iloc[-1]
df = df.stack()
# df.name = (name, component, variable)
df.name = variable
df.index.rename(["space", "space_2"], inplace=True)
df = pd.to_numeric(df)
xr_da = df.to_xarray()
# add variable [e.g. 'TAC'] and units to attributes of xarray
unit = variables_unit[variable]
xr_da.attrs[variable] = unit
xr_dss[ip][name][component] = xr.merge(
[xr_dss[ip][name][component], xr_da],
combine_attrs="drop_conflicts",
)
# Write output from esM.esM.componentModelingDict[name].getOptimalValues() to datasets
data = esM.componentModelingDict[name].getOptimalValues(ip=ip)
dataTD1dim, indexTD1dim, dataTD2dim, indexTD2dim = [], [], [], []
dataTI, indexTI = [], []
for key, d in data.items():
if d["values"] is None:
continue
if d["timeDependent"]:
if d["dimension"] == "1dim":
dataTD1dim.append(d["values"]), indexTD1dim.append(key)
elif d["dimension"] == "2dim":
dataTD2dim.append(d["values"]), indexTD2dim.append(key)
else:
dataTI.append(d["values"]), indexTI.append(key)
# One dimensional time dependent data
if dataTD1dim:
names = ["Variable", "Component", "Location"]
dfTD1dim = pd.concat(dataTD1dim, keys=indexTD1dim, names=names)
# dfTD1dim = dfTD1dim.loc[
# ((dfTD1dim != 0) & (~dfTD1dim.isnull())).any(axis=1)
# ]
for variable in dfTD1dim.index.get_level_values(0).unique():
# for component in dfTD1dim.index.get_level_values(1).unique():
for component in (
dfTD1dim.loc[variable].index.get_level_values(0).unique()
):
df = dfTD1dim.loc[(variable, component)].T.stack()
# df.name = (name, component, variable)
df.name = variable
df.index.rename(["time", "space"], inplace=True)
xr_da = df.to_xarray()
xr_dss[ip][name][component] = xr.merge(
[xr_dss[ip][name][component], xr_da]
)
# Two dimensional time dependent data
if dataTD2dim:
names = ["Variable", "Component", "LocationIn", "LocationOut"]
dfTD2dim = pd.concat(dataTD2dim, keys=indexTD2dim, names=names)
# dfTD2dim = dfTD2dim.loc[
# ((dfTD2dim != 0) & (~dfTD2dim.isnull())).any(axis=1)
# ]
for variable in dfTD2dim.index.get_level_values(0).unique():
# for component in dfTD2dim.index.get_level_values(1).unique():
for component in (
dfTD2dim.loc[variable].index.get_level_values(0).unique()
):
df = dfTD2dim.loc[(variable, component)].stack()
# df.name = (name, component, variable)
df.name = variable
df.index.rename(["space", "space_2", "time"], inplace=True)
df.index = df.index.reorder_levels([2, 0, 1])
xr_da = df.to_xarray()
xr_dss[ip][name][component] = xr.merge(
[xr_dss[ip][name][component], xr_da]
)
# Time independent data
if dataTI:
# One dimensional
if esM.componentModelingDict[name].dimension == "1dim":
names = ["Variable type", "Component"]
dfTI = pd.concat(dataTI, keys=indexTI, names=names)
# dfTI = dfTI.loc[((dfTI != 0) & (~dfTI.isnull())).any(axis=1)]
for variable in dfTI.index.get_level_values(0).unique():
# for component in dfTI.index.get_level_values(1).unique():
for component in (
dfTI.loc[variable].index.get_level_values(0).unique()
):
df = dfTI.loc[(variable, component)].T
# df.name = (name, component, variable)
df.name = variable
df.index.rename("space", inplace=True)
xr_da = df.to_xarray()
xr_dss[ip][name][component] = xr.merge(
[xr_dss[ip][name][component], xr_da]
)
# Two dimensional
elif esM.componentModelingDict[name].dimension == "2dim":
names = ["Variable type", "Component", "Location"]
dfTI = pd.concat(dataTI, keys=indexTI, names=names)
# dfTI = dfTI.loc[((dfTI != 0) & (~dfTI.isnull())).any(axis=1)]
for variable in dfTI.index.get_level_values(0).unique():
# for component in dfTI.index.get_level_values(1).unique():
for component in (
dfTI.loc[variable].index.get_level_values(0).unique()
):
df = dfTI.loc[(variable, component)].T.stack()
# df.name = (name, component, variable)
df.name = variable
df.index.rename(["space", "space_2"], inplace=True)
xr_da = df.to_xarray()
xr_dss[ip][name][component] = xr.merge(
[xr_dss[ip][name][component], xr_da]
)
for name in esM.componentModelingDict.keys():
for component in esM.componentModelingDict[name].componentsDict.keys():
if list(xr_dss[ip][name][component].data_vars) == []:
# Delete components that have not been built.
del xr_dss[ip][name][component]
else:
# Cast space coordinats to str. If this is not done then dtype will be object.
xr_dss[ip][name][component].coords["space"] = (
xr_dss[ip][name][component].coords["space"].astype(str)
)
if esM.componentModelingDict[name].dimension == "2dim":
xr_dss[ip][name][component].coords["space_2"] = (
xr_dss[ip][name][component].coords["space_2"].astype(str)
)
xr_dss = {"Results": xr_dss}
return xr_dss
[docs]
def writeDatasetsToNetCDF(
datasets,
outputFilePath="my_esm.nc",
removeExisting=False,
mode="a",
groupPrefix=None,
):
"""
Saves dictionary of xarray datasets (with esM instance data) to a netCDF
file.
**Required arguments:**
:param datasets: The xarray datasets holding all data required to set up an esM instance.
:type datasets: Dict[xr.Dataset]
**Default arguments:**
:param outputFilePath: output file name of the netCDF file (can include full path)
|br| * the default value is "my_esm.nc"
:type outputFilePath: string
:param removeExisting: indicates if an existing netCDF file should be removed
|br| * the default value is False
:type removeExisting: boolean
:param mode: Write (‘w’) or append (‘a’) mode.
* If mode=’w’, any existing file at this location will be overwritten.
* If mode=’a’, existing variables will be overwritten.
|br| * the default value is 'a'
:type mode: string
:param groupPrefix: if specified, multiple xarray datasets (with esM
instance data) are saved to the same netcdf file. The dictionary
structure is then {group_prefix}/{group}/{...} instead of {group}/{...}
|br| * the default value is None
:type groupPrefix: string
"""
# Create netCDF file, remove existant
if removeExisting:
if Path(outputFilePath).is_file():
Path(outputFilePath).unlink()
if not Path(outputFilePath).is_file():
with Dataset(outputFilePath, "w", format="NETCDF4") as _:
pass
for group in datasets.keys():
if group == "Parameters":
xarray_dataset = datasets[group]
_xarray_dataset = (
xarray_dataset.copy()
) # Copying to avoid errors due to change of size during iteration
for attr_name, attr_value in _xarray_dataset.attrs.items():
# if the attribute is set, convert into sorted list
if isinstance(attr_value, set):
xarray_dataset.attrs[attr_name] = sorted(
xarray_dataset.attrs[attr_name]
)
# if the attribute is dict, convert into a "flattened" list
elif isinstance(attr_value, dict):
xarray_dataset.attrs[attr_name] = list(
f"{k} : {v}"
for (k, v) in xarray_dataset.attrs[attr_name].items()
)
# if the attribute is pandas series, add a new attribute corresponding
# to each row.
elif isinstance(attr_value, pd.Series):
for idx, value in attr_value.items():
xarray_dataset.attrs.update({f"{attr_name}.{idx}": value})
# Delete the original attribute
del xarray_dataset.attrs[attr_name]
# if the attribute is pandas df, add a new attribute corresponding
# to each row by converting the column into a numpy array.
elif isinstance(attr_value, pd.DataFrame):
_df = attr_value
_df = _df.reindex(sorted(_df.columns), axis=1)
for idx, row in _df.iterrows():
xarray_dataset.attrs.update(
{f"{attr_name}.{idx}": row.to_numpy()}
)
# Delete the original attribute
del xarray_dataset.attrs[attr_name]
# if the attribute is bool, add a corresponding string
elif isinstance(attr_value, bool):
xarray_dataset.attrs[attr_name] = (
"True" if attr_value is True else "False"
)
# if the attribute is None, add a corresponding string
elif attr_value is None:
xarray_dataset.attrs[attr_name] = "None"
if groupPrefix:
group_path = f"{groupPrefix}/{group}"
else:
group_path = f"{group}"
xarray_dataset.to_netcdf(
path=f"{outputFilePath}",
# Datasets per component will be reflectes as groups in the NetCDF file.
group=group_path,
# Use mode='a' to append datasets to existing file. Variables will be overwritten.
mode=mode,
)
continue
elif group == "Results":
for ip in datasets[group].keys():
for model, comps in datasets[group][ip].items():
for component in comps.keys():
if component is not None:
if groupPrefix:
group_path = (
f"{groupPrefix}/{group}/{ip}/{model}/{component}"
)
else:
group_path = f"{group}/{ip}/{model}/{component}"
datasets[group][ip][model][component].to_netcdf(
path=f"{outputFilePath}",
# Datasets per component will be reflectes as groups in the NetCDF file.
group=group_path,
# Use mode='a' to append datasets to existing file. Variables will be overwritten.
mode=mode,
# Use zlib variable compression to reduce filesize with little performance loss
# for our use-case. Complevel 9 for best compression.
encoding={
var: {"zlib": True, "complevel": 9}
for var in list(
datasets[group][ip][model][component].data_vars
)
},
)
else:
for model, comps in datasets[group].items():
for component in comps.keys():
if component is not None:
if groupPrefix:
group_path = f"{groupPrefix}/{group}/{model}/{component}"
else:
group_path = f"{group}/{model}/{component}"
datasets[group][model][component].to_netcdf(
path=f"{outputFilePath}",
# Datasets per component will be reflectes as groups in the NetCDF file.
group=group_path,
# Use mode='a' to append datasets to existing file. Variables will be overwritten.
mode=mode,
# Use zlib variable compression to reduce filesize with little performance loss
# for our use-case. Complevel 9 for best compression.
encoding={
var: {"zlib": True, "complevel": 9}
for var in list(
datasets[group][model][component].data_vars
)
},
)
[docs]
def convertDatasetsToEnergySystemModel(datasets):
"""
Takes dictionary of xarray datasets (with esM instance data) and converts
it to an esM instance.
:param datasets: The xarray datasets holding all data required to set up an esM instance.
:type datasets: Dict[xr.Dataset]
:return: esM - EnergySystemModel instance
:rtype: EnergySystemModel instance
"""
# Read parameters
xarray_dataset = utilsIO.processXarrayAttributes(datasets["Parameters"])
esm_dict = xarray_dataset.attrs
# Read input
# Iterate through each component-variable pair, depending on the variable's
# prefix restructure the data and add it to component_dict
component_dict = utilsIO.PowerDict()
for model, comps in datasets["Input"].items():
for component_name, comp_xr in comps.items():
for variable, comp_var_xr in comp_xr.data_vars.items():
if not pd.isnull(comp_var_xr.values).all(): # Skip if all are NAs
component = f"{model}; {component_name}"
# STEP 4 (i). Set regional time series (region, time)
if variable[:3] == "ts_":
component_dict = utilsIO.addTimeSeriesVariableToDict(
component_dict,
comp_var_xr,
component,
variable,
drop_component=False,
)
# STEP 4 (ii). Set 2d data (region, region)
elif variable[:3] == "2d_":
component_dict = utilsIO.add2dVariableToDict(
component_dict,
comp_var_xr,
component,
variable,
drop_component=False,
)
# STEP 4 (iii). Set 1d data (region)
elif variable[:3] == "1d_":
component_dict = utilsIO.add1dVariableToDict(
component_dict,
comp_var_xr,
component,
variable,
drop_component=False,
)
# STEP 4 (iv). Set 0d data
elif variable[:3] == "0d_":
component_dict = utilsIO.add0dVariableToDict(
component_dict, comp_var_xr, component, variable
)
# Create esm from esm_dict and component_dict
esM = dictIO.importFromDict(esm_dict, component_dict)
# Read output
if "Results" in datasets:
# get startyear to find model classes
startyear = list(datasets["Results"].keys())[0]
for model, comps in datasets["Results"][startyear].items():
optSum = {}
operationVariablesOptimum_dict = {}
capacityVariablesOptimum_dict = {}
isBuiltVariablesOptimum_dict = {}
chargeOperationVariablesOptimum_dict = {}
dischargeOperationVariablesOptimum_dict = {}
stateOfChargeOperationVariablesOptimum_dict = {}
for ip in datasets["Results"].keys():
# read opt Summary
optSum_df = pd.DataFrame([])
for component in datasets["Results"][ip][model]:
optSum_df_comp = pd.DataFrame([])
for variable in datasets["Results"][ip][model][component]:
if "Optimum" in variable:
continue
if "space_2" in list(
datasets["Results"][ip][model][component].coords
):
_optSum_df = (
datasets["Results"][ip][model][component][variable]
.to_dataframe()
.unstack()
)
iterables = [
[component, variable, unit]
for variable, unit in datasets["Results"][ip][model][
component
][variable].attrs.items()
]
iterables2 = [
[iterables[0] + [location]][0]
for location in datasets["Results"][ip][model][
component
][variable]["space"].values
]
idx = pd.MultiIndex.from_tuples(tuple(iterables2))
_optSum_df.index = idx
_optSum_df.index.names = [
"Component",
"Property",
"Unit",
"LocationIn",
]
_optSum_df = _optSum_df.droplevel(0, axis=1)
if isinstance(_optSum_df, pd.Series):
_optSum_df = _optSum_df.to_frame().T
optSum_df_comp = pd.concat(
[optSum_df_comp, _optSum_df],
axis=0,
)
else:
_optSum_df = (
datasets["Results"][ip][model][component][variable]
.to_dataframe()
.T
)
iterables = [
[component, variable, unit]
for variable, unit in datasets["Results"][ip][model][
component
][variable].attrs.items()
]
_optSum_df.index = pd.MultiIndex.from_tuples(iterables)
_optSum_df.index.names = ["Component", "Property", "Unit"]
if isinstance(_optSum_df, pd.Series):
_optSum_df = _optSum_df.to_frame().T
optSum_df_comp = pd.concat(
[optSum_df_comp, _optSum_df],
axis=0,
)
if isinstance(optSum_df_comp, pd.Series):
optSum_df_comp = optSum_df_comp.to_frame().T
optSum_df = pd.concat(
[optSum_df, optSum_df_comp],
axis=0,
)
optSum[int(ip)] = optSum_df
setattr(esM.componentModelingDict[model], "_optSummary", optSum)
# read optimal Values (3 types exist)
operationVariablesOptimum_dict[int(ip)] = pd.DataFrame([])
capacityVariablesOptimum_dict[int(ip)] = pd.DataFrame([])
isBuiltVariablesOptimum_dict[int(ip)] = pd.DataFrame([])
chargeOperationVariablesOptimum_dict[int(ip)] = pd.DataFrame([])
dischargeOperationVariablesOptimum_dict[int(ip)] = pd.DataFrame([])
stateOfChargeOperationVariablesOptimum_dict[int(ip)] = pd.DataFrame([])
for component in datasets["Results"][ip][model]:
_operationVariablesOptimum_df = pd.DataFrame([])
_capacityVariablesOptimum_df = pd.DataFrame([])
_isBuiltVariablesOptimum_df = pd.DataFrame([])
_chargeOperationVariablesOptimum_df = pd.DataFrame([])
_dischargeOperationVariablesOptimum_df = pd.DataFrame([])
_stateOfChargeOperationVariablesOptimum_df = pd.DataFrame([])
for variable in datasets["Results"][ip][model][component]:
if "Optimum" not in variable:
continue
opt_variable = variable
xr_opt = None
if opt_variable in datasets["Results"][ip][model][component]:
xr_opt = datasets["Results"][ip][model][component][
opt_variable
]
else:
continue
if opt_variable == "_operationVariablesOptimum":
if "space_2" in list(xr_opt.coords):
df = (
xr_opt.to_dataframe()
.unstack(level=0)
.droplevel(0, axis=1)
)
_operationVariablesOptimum_df = pd.DataFrame([])
for item in df.index.get_level_values(0).unique():
_df = df.loc[item]
_df = _df.drop(item)
idx = pd.MultiIndex.from_product(
[[component], [item], list(_df.index)]
)
_df = _df.set_index(idx)
if isinstance(_df, pd.Series):
_df = _df.to_frame().T
_operationVariablesOptimum_df = pd.concat(
[_operationVariablesOptimum_df, _df],
axis=0,
)
else:
_operationVariablesOptimum_df = (
xr_opt.to_dataframe()
.unstack(level=0)
.droplevel(0, axis=1)
)
_operationVariablesOptimum_df = (
_operationVariablesOptimum_df.dropna(axis=0)
)
idx = pd.MultiIndex.from_product(
[[component], _operationVariablesOptimum_df.index]
)
_operationVariablesOptimum_df = (
_operationVariablesOptimum_df.set_index(idx)
)
if opt_variable == "_capacityVariablesOptimum":
if "space_2" in list(xr_opt.coords):
df = (
xr_opt.to_dataframe()
.unstack(level=0)
.droplevel(0, axis=1)
)
idx = pd.MultiIndex.from_product(
[[component], list(df.index)]
)
_df = df.set_index(idx)
_capacityVariablesOptimum_df = _df
else:
_capacityVariablesOptimum_df = xr_opt.to_dataframe().T
_capacityVariablesOptimum_df = (
_capacityVariablesOptimum_df.set_axis([component])
)
if opt_variable == "_isBuiltVariablesOptimum":
_isBuiltVariablesOptimum_df = (
xr_opt.to_dataframe()
.unstack(level=0)
.droplevel(0, axis=1)
)
idx = pd.MultiIndex.from_product(
[[component], _isBuiltVariablesOptimum_df.index]
)
_isBuiltVariablesOptimum_df = (
_isBuiltVariablesOptimum_df.set_index(idx)
)
if opt_variable == "chargeOperationVariablesOptimum":
_chargeOperationVariablesOptimum_df = (
xr_opt.to_dataframe()
.unstack(level=0)
.droplevel(0, axis=1)
)
idx = pd.MultiIndex.from_product(
[[component], _chargeOperationVariablesOptimum_df.index]
)
_chargeOperationVariablesOptimum_df = (
_chargeOperationVariablesOptimum_df.set_index(idx)
)
if opt_variable == "dischargeOperationVariablesOptimum":
_dischargeOperationVariablesOptimum_df = (
xr_opt.to_dataframe()
.unstack(level=0)
.droplevel(0, axis=1)
)
idx = pd.MultiIndex.from_product(
[
[component],
_dischargeOperationVariablesOptimum_df.index,
]
)
_dischargeOperationVariablesOptimum_df = (
_dischargeOperationVariablesOptimum_df.set_index(idx)
)
if opt_variable == "stateOfChargeOperationVariablesOptimum":
_stateOfChargeOperationVariablesOptimum_df = (
xr_opt.to_dataframe()
.unstack(level=0)
.droplevel(0, axis=1)
)
idx = pd.MultiIndex.from_product(
[
[component],
_stateOfChargeOperationVariablesOptimum_df.index,
]
)
_stateOfChargeOperationVariablesOptimum_df = (
_stateOfChargeOperationVariablesOptimum_df.set_index(
idx
)
)
if isinstance(_operationVariablesOptimum_df, pd.Series):
_operationVariablesOptimum_df = (
_operationVariablesOptimum_df.to_frame().T
)
operationVariablesOptimum_dict[int(ip)] = pd.concat(
[
operationVariablesOptimum_dict[int(ip)],
_operationVariablesOptimum_df,
],
axis=0,
)
if isinstance(_capacityVariablesOptimum_df, pd.Series):
_capacityVariablesOptimum_df = (
_capacityVariablesOptimum_df.to_frame().T
)
capacityVariablesOptimum_dict[int(ip)] = pd.concat(
[
capacityVariablesOptimum_dict[int(ip)],
_capacityVariablesOptimum_df,
],
axis=0,
)
if isinstance(_isBuiltVariablesOptimum_df, pd.Series):
_isBuiltVariablesOptimum_df = (
_isBuiltVariablesOptimum_df.to_frame().T
)
isBuiltVariablesOptimum_dict[int(ip)] = pd.concat(
[
isBuiltVariablesOptimum_dict[int(ip)],
_isBuiltVariablesOptimum_df,
],
axis=0,
)
if isinstance(_chargeOperationVariablesOptimum_df, pd.Series):
_chargeOperationVariablesOptimum_df = (
_chargeOperationVariablesOptimum_df.to_frame().T
)
chargeOperationVariablesOptimum_dict[int(ip)] = pd.concat(
[
chargeOperationVariablesOptimum_dict[int(ip)],
_chargeOperationVariablesOptimum_df,
],
axis=0,
)
if isinstance(_dischargeOperationVariablesOptimum_df, pd.Series):
_dischargeOperationVariablesOptimum_df = (
_dischargeOperationVariablesOptimum_df.to_frame().T
)
dischargeOperationVariablesOptimum_dict[int(ip)] = pd.concat(
[
dischargeOperationVariablesOptimum_dict[int(ip)],
_dischargeOperationVariablesOptimum_df,
],
axis=0,
)
if isinstance(
_stateOfChargeOperationVariablesOptimum_df, pd.Series
):
_stateOfChargeOperationVariablesOptimum_df = (
_stateOfChargeOperationVariablesOptimum_df.to_frame().T
)
stateOfChargeOperationVariablesOptimum_dict[int(ip)] = pd.concat(
[
stateOfChargeOperationVariablesOptimum_dict[int(ip)],
_stateOfChargeOperationVariablesOptimum_df,
],
axis=0,
)
# check if empty, if yes convert to None
if operationVariablesOptimum_dict[int(ip)].empty:
operationVariablesOptimum_dict[int(ip)] = None
if capacityVariablesOptimum_dict[int(ip)].empty:
capacityVariablesOptimum_dict[int(ip)] = None
if isBuiltVariablesOptimum_dict[int(ip)].empty:
isBuiltVariablesOptimum_dict[int(ip)] = None
if chargeOperationVariablesOptimum_dict[int(ip)].empty:
chargeOperationVariablesOptimum_dict[int(ip)] = None
if dischargeOperationVariablesOptimum_dict[int(ip)].empty:
dischargeOperationVariablesOptimum_dict[int(ip)] = None
if stateOfChargeOperationVariablesOptimum_dict[int(ip)].empty:
stateOfChargeOperationVariablesOptimum_dict[int(ip)] = None
setattr(
esM.componentModelingDict[model],
"_operationVariablesOptimum",
operationVariablesOptimum_dict,
)
setattr(
esM.componentModelingDict[model],
"_capacityVariablesOptimum",
capacityVariablesOptimum_dict,
)
setattr(
esM.componentModelingDict[model],
"_isBuiltVariablesOptimum",
isBuiltVariablesOptimum_dict,
)
setattr(
esM.componentModelingDict[model],
"_chargeOperationVariablesOptimum",
chargeOperationVariablesOptimum_dict,
)
setattr(
esM.componentModelingDict[model],
"_dischargeOperationVariablesOptimum",
dischargeOperationVariablesOptimum_dict,
)
setattr(
esM.componentModelingDict[model],
"_stateOfChargeOperationVariablesOptimum",
stateOfChargeOperationVariablesOptimum_dict,
)
# if only one investment period -> keep optimal values unchanged for end user
def setFinalOptimalValues(esM, name):
if len(esM.investmentPeriodNames) == 1:
data = getattr(esM.componentModelingDict[model], "_" + name)
setattr(
esM.componentModelingDict[model], name, data[int(startyear)]
)
else:
data = getattr(esM.componentModelingDict[model], "_" + name)
setattr(esM.componentModelingDict[model], name, data)
return esM
optimalParameters = [
"optSummary",
"operationVariablesOptimum",
"capacityVariablesOptimum",
"isBuiltVariablesOptimum",
"chargeOperationVariablesOptimum",
"dischargeOperationVariablesOptimum",
"stateOfChargeOperationVariablesOptimum",
]
for name in optimalParameters:
esM = setFinalOptimalValues(esM, name)
return esM
[docs]
def writeEnergySystemModelToNetCDF(
esM,
outputFilePath="my_esm.nc",
overwriteExisting=False,
optSumOutputLevel=0,
groupPrefix=None,
):
"""
Write energySystemModel (input and if exists, output) to netCDF file.
:param esM: EnergySystemModel instance in which the model is held
:type esM: EnergySystemModel instance
:param outputFilePath: output file name (can include full path)
|br| * the default value is "my_esm.nc"
:type file_path: string
:param overwriteExisting: Overwrite existing netCDF file
|br| * the default value is False
:type outputFileName: boolean
:param optSumOutputLevel: Output level of the optimization summary (see
EnergySystemModel). Either an integer (0,1,2) which holds for all model
classes or a dictionary with model class names as keys and an integer
(0,1,2) for each key (e.g. {'StorageModel':1,'SourceSinkModel':1,...}
|br| * the default value is 2
:type optSumOutputLevel: int (0,1,2) or dict
:param groupPrefix: if specified, multiple xarray datasets (with esM
instance data) are saved to the same netcdf file. The dictionary
structure is then {group_prefix}/{group}/{...} instead of {group}/{...}
|br| * the default value is None
:type group_prefix: string
:return: Nested dictionary containing xr.Dataset with all result values
for each component.
:rtype: Dict[str, Dict[str, xr.Dataset]]
"""
if overwriteExisting:
if Path(outputFilePath).is_file():
Path(outputFilePath).unlink()
utils.output("\nWriting output to netCDF... ", esM.verbose, 0)
_t = time.time()
xr_dss_input = convertOptimizationInputToDatasets(esM)
writeDatasetsToNetCDF(xr_dss_input, outputFilePath, groupPrefix=groupPrefix)
if esM.objectiveValue is not None: # model was optimized
xr_dss_output = convertOptimizationOutputToDatasets(esM, optSumOutputLevel)
writeDatasetsToNetCDF(xr_dss_output, outputFilePath, groupPrefix=groupPrefix)
utils.output("Done. (%.4f" % (time.time() - _t) + " sec)", esM.verbose, 0)
[docs]
def writeEnergySystemModelToDatasets(esM):
"""Converts esM instance (input and output) into a xarray dataset.
:param esM: EnergySystemModel instance in which the optimized model is held
:type esM: EnergySystemModel instance
:return: xr_dss_results - esM instance (input and output) data in xarray
dataset format
:rtype: xr.DataSet
"""
if esM.objectiveValue is not None: # model was optimized
xr_dss_output = convertOptimizationOutputToDatasets(esM)
xr_dss_input = convertOptimizationInputToDatasets(esM)
xr_dss_results = {
"Results": xr_dss_output["Results"],
"Input": xr_dss_input["Input"],
"Parameters": xr_dss_input["Parameters"],
}
else:
xr_dss_input = convertOptimizationInputToDatasets(esM)
xr_dss_results = {
"Input": xr_dss_input["Input"],
"Parameters": xr_dss_input["Parameters"],
}
return xr_dss_results
[docs]
def readNetCDFToDatasets(filePath="my_esm.nc", groupPrefix=None):
"""
Read optimization results from grouped netCDF file to dictionary of
xr.Datasets.
:param filePath: output file name of netCDF file (can include full path)
|br| * the default value is "my_esm.nc"
:type filePath: string
:param groupPrefix: if specified, multiple xarray datasets (with esM
instance data) are saved to the same netcdf file. The dictionary
structure is then {group_prefix}/{group}/{...} instead of {group}/{...}
|br| * the default value is None
:type groupPrefix: string
:return: Nested dictionary containing an xr.Dataset with all result values
for each component.
:rtype: Nested dict
"""
with Dataset(filePath, "r", format="NETCDF4") as rootgrp:
if groupPrefix:
group_keys = rootgrp[groupPrefix].groups
else:
group_keys = rootgrp.groups
if not groupPrefix:
xr_dss = {}
# read input from netcdf
xr_dss["Input"] = {
model_key: {
comp_key: xr.load_dataset(
filePath, group=f"Input/{model_key}/{comp_key}"
)
for comp_key in group_keys["Input"][model_key].groups
}
for model_key in group_keys["Input"].groups
}
# read results from netcdf
if "Results" in group_keys:
xr_dss["Results"] = {
ip_key: {
model_key: {
comp_key: xr.load_dataset(
filePath, group=f"Results/{ip_key}/{model_key}/{comp_key}"
)
for comp_key in group_keys["Results"][ip_key][model_key].groups
}
for model_key in group_keys["Results"][ip_key].groups
}
for ip_key in group_keys["Results"].groups
}
# read parameters from netcdf
xr_dss["Parameters"] = xr.load_dataset(filePath, group="Parameters")
else:
xr_dss = {}
# read input from netcdf
xr_dss["Input"] = {
model_key: {
comp_key: xr.load_dataset(
filePath,
group=f"{groupPrefix}/Input/{model_key}/{comp_key}",
)
for comp_key in group_keys["Input"][model_key].groups
}
for model_key in group_keys["Input"].groups
}
# read results from netcdf
if "Results" in group_keys:
xr_dss["Results"] = {
ip_key: {
model_key: {
comp_key: xr.load_dataset(
filePath,
group=f"{groupPrefix}/Results/{ip_key}/{model_key}/{comp_key}",
)
for comp_key in group_keys["Results"][ip_key][model_key].groups
}
for model_key in group_keys["Results"][ip_key].groups
}
for ip_key in group_keys["Results"].groups
}
# read parameters from netcdf
xr_dss["Parameters"] = xr.load_dataset(
filePath, group=f"{groupPrefix}/Parameters"
)
return xr_dss
[docs]
def readNetCDFtoEnergySystemModel(filePath, groupPrefix=None):
"""
Converts netCDF file into an EnergySystemModel instance.
:param filePath: file name of netCDF file (can include full path) in which
the esM data is stored
|br| * the default value is "my_esm.nc"
:type filePath: string
:return: EnergySystemModel instance
:rtype: EnergySystemModel instance
"""
# netcdf to xarray dataset
xr_dss = readNetCDFToDatasets(filePath, groupPrefix)
# xarray dataset to esm
esM = convertDatasetsToEnergySystemModel(xr_dss)
return esM