# Copyright 2020-2021 Ecole Nationale des Ponts et Chaussées
#
# This file is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# Original author Lucas Vivier <vivier@centre-cired.fr>
import os
import sys
import matplotlib.pyplot as plt
import pandas as pd
from pandas import Series, DataFrame, MultiIndex, Index, IndexSlice, concat, to_numeric, unique
from numpy import exp, log, append, array, allclose
from numpy.testing import assert_almost_equal
from scipy.optimize import fsolve
import logging
from copy import deepcopy
from itertools import product
from project.utils import make_plot, reindex_mi, make_plots, calculate_annuities, deciles2quintiles_dict, size_dict, \
get_size, compare_bar_plot, make_sensitivity_tables, cumulated_plot, find_discount_rate, conditional_expectation, get_json
from project.utils import make_hist, reverse_dict, select, make_grouped_scatterplots, calculate_average, make_scatter_plot, add_no_renovation
from project.utils import factor_annuities
import project.thermal as thermal
ACCURACY = 10 ** -5
EPC2INT = {'A': 0, 'B': 1, 'C': 2, 'D': 3, 'E': 4, 'F': 5, 'G': 6}
VAT = 0.2 # VAT
NB_MEASURES = {1: [(False, False, False, True), (False, False, True, False), (False, True, False, False),
(True, False, False, False)],
2: [(False, False, True, True), (False, True, False, True), (True, False, False, True),
(True, False, True, False),
(True, True, False, False), (False, True, True, False)],
3: [(False, True, True, True), (True, False, True, True), (True, True, False, True),
(True, True, True, False)],
4: [(True, True, True, True)]}
INSULATION = {'Wall': (True, False, False, False), 'Floor': (False, True, False, False),
'Roof': (False, False, True, False), 'Windows': (False, False, False, True)}
CONSUMPTION_LEVELS = ['Housing type', 'Wall', 'Floor', 'Roof', 'Windows', 'Heating system']
[docs]class ThermalBuildings:
"""ThermalBuildings classes.
Parameters
----------
stock: Series
Building stock.
surface: Series
Surface by dwelling type.
ratio_surface: dict
Losses area of each envelop component
efficiency: Series
Heating system efficiency.
income: Series
Average income value by income class.
consumption_ini: Series
Initial energy consumption.
path: str, optional
year: int, default: 2018
Attributes
----------
"""
def __init__(self, stock, surface, ratio_surface, efficiency, income, path=None, year=2018,
resources_data=None, detailed_output=None, figures=None, residual_rate=0, temp_sink=None):
# default values
self.heating_intensity_max = None
self.hi_threshold = None
if figures is None:
figures = True
if detailed_output is None:
detailed_output = True
if isinstance(stock, MultiIndex):
stock = Series(index=stock, dtype=float)
self._resources_data = resources_data
self._efficiency_yrs = efficiency
if isinstance(efficiency, DataFrame):
self._efficiency = self._efficiency_yrs.loc[:, year]
else:
self._efficiency = efficiency
self._temp_sink_yrs = temp_sink
if temp_sink is not None:
self._temp_sink = self._temp_sink_yrs.loc[year]
else:
self._temp_sink = None
self._ratio_surface = ratio_surface
self.path, self.path_ini, self.path_calibration = path, None, None
if path is not None and detailed_output:
self.path_calibration = os.path.join(path, 'calibration')
if not os.path.isdir(self.path_calibration):
os.mkdir(self.path_calibration)
if figures:
self.path_ini = os.path.join(path, 'ini')
if not os.path.isdir(self.path_ini):
os.mkdir(self.path_ini)
self.coefficient_global, self.coefficient_backup = None, None
self._surface_yrs = surface
self._surface = surface.loc[:, year]
self._income_yrs = income.copy()
# self._income_ini = income.loc[:, year].copy()
self._income, self._income_owner, self._income_tenant = None, None, None
self.income = self._income_yrs.loc[:, year]
self._residual_rate = residual_rate
self._stock_residual = self._residual_rate * stock
self.stock_mobile = stock - self._stock_residual
self.first_year = year
self._year = year
self.taxes_list = []
self.consumption_before_retrofit = None
self.intensity_before_retrofit = None
# store values to not recalculate standard energy consumption
self._consumption_store = {
'consumption': Series(dtype='float'),
'consumption_3uses': Series(dtype='float'),
'certificate': Series(dtype='float'),
'consumption_renovation': Series(dtype='float'),
'consumption_3uses_renovation': Series(dtype='float'),
'certificate_renovation': Series(dtype='float'),
}
self.stock = stock
if self.path_ini is not None:
stock = self.add_certificate(stock).groupby('Performance').sum() / 10 ** 6
compare_performance = concat((stock, self._resources_data['performance_stock']), axis=1,
keys=['Model', 'SDES-2018'])
compare_bar_plot(compare_performance, 'Stock by performance (Million dwelling)',
save=os.path.join(self.path_ini, 'stock_performance.png'))
@property
def year(self):
return self._year
@year.setter
def year(self, year):
self._year = year
self._surface = self._surface_yrs.loc[:, year]
self.consumption_before_retrofit = None
self.intensity_before_retrofit = None
self.income = self._income_yrs.loc[:, year]
if isinstance(self._efficiency_yrs, DataFrame):
self._efficiency = self._efficiency_yrs.loc[:, year]
if self._temp_sink_yrs is not None:
self._temp_sink = self._temp_sink_yrs.loc[year]
@property
def stock(self):
return self._stock
@stock.setter
def stock(self, stock):
"""Update stock property.
Automatically calculate consumption standard and certificate.
Parameters
----------
stock: Series
Returns
-------
"""
self._stock = stock
stock_mobile = stock - self._stock_residual.reindex(stock.index, fill_value=0)
self.stock_mobile = stock_mobile[stock_mobile > ACCURACY]
self.surface = reindex_mi(self._surface, stock.index)
self.energy = self.to_energy(stock).astype('category')
self._resources_data['index']['Energy'] = [i for i in self._resources_data['index']['Energy'] if
i in self.energy.unique()]
consumption_sd, _, certificate = self.consumption_heating_store(stock.index)
self.certificate = reindex_mi(certificate, stock.index).astype('category')
@property
def income(self):
return self._income
@income.setter
def income(self, income):
self._income = income
self._income_owner = self._income.copy()
self._income_owner.index.rename('Income owner', inplace=True)
self._income_tenant = self._income.copy()
self._income_tenant.index.rename('Income tenant', inplace=True)
[docs] def simplified_stock(self, energy_level=False):
"""Return simplified stock.
Parameters
----------
energy_level
Returns
-------
Series
Simplified stock.
"""
stock = self.stock.fillna(0)
certificate = self.certificate.rename('Performance')
energy = self.to_energy(stock).rename('Energy')
stock = concat((stock, certificate, energy), axis=1).set_index(['Performance', 'Energy'], append=True).squeeze()
if energy_level:
stock = stock.groupby(
['Occupancy status', 'Income owner', 'Income tenant', 'Housing type', 'Heating system',
'Energy', 'Performance']).sum()
else:
stock = stock.groupby(
['Occupancy status', 'Income owner', 'Income tenant', 'Housing type', 'Heating system',
'Performance']).sum()
stock = stock[stock > 0]
return stock
[docs] @staticmethod
def to_energy(df, level_heater='Heating system'):
return Series(df.index.get_level_values(level_heater), index=df.index).str.split('-').str[0].rename(
'Energy')
[docs] @staticmethod
def to_emission(df, emission):
if 'Heating system' in df.index.names:
return emission.reindex(ThermalBuildings.to_energy(df)).set_axis(df.index) * df
elif 'Energy' in df.index.names:
return emission.reindex(df.index.get_level_values('Energy')).set_axis(df.index) * df
[docs] def add_certificate(self, df):
"""Add energy performance certificate to df index.
Parameters
----------
df
Returns
-------
"""
columns = None
if isinstance(df, DataFrame):
columns = df.columns
certificate = self.certificate.rename('Performance')
lvl = [i for i in certificate.index.names if i in df.index.names]
certificate = certificate.groupby(lvl).first()
certificate = reindex_mi(certificate, df.index)
df = concat((df, certificate), axis=1).set_index('Performance', append=True).squeeze()
if isinstance(df, DataFrame):
df.columns = columns
return df
[docs] def add_energy(self, df):
columns = None
if isinstance(df, DataFrame):
columns = df.columns
energy = self.energy.rename('Energy')
lvl = [i for i in energy.index.names if i in df.index.names]
energy = energy.groupby(lvl).first()
energy = reindex_mi(energy, df.index)
df = concat((df, energy), axis=1).set_index('Energy', append=True).squeeze()
if isinstance(df, DataFrame):
df.columns = columns
return df
[docs] def need_heating(self, climate=2006, smooth=False, freq='year', hourly_profile=None, unit='kWh/y'):
"""Calculate heating need of the current building stock.
Returns
-------
DataFrame
"""
idx = self.stock.index
wall = Series(idx.get_level_values('Wall'), index=idx)
floor = Series(idx.get_level_values('Floor'), index=idx)
roof = Series(idx.get_level_values('Roof'), index=idx)
windows = Series(idx.get_level_values('Windows'), index=idx)
heating_need = thermal.conventional_heating_need(wall, floor, roof, windows, self._ratio_surface.copy(),
th_bridging='Medium', vent_types='Ventilation naturelle',
infiltration='Medium', climate=climate,
smooth=smooth, freq=freq, hourly_profile=hourly_profile)
if unit == 'kWh/y':
if isinstance(heating_need, (Series, float, int)):
heating_need = heating_need * self.stock * self.surface
elif isinstance(heating_need, DataFrame):
heating_need = (heating_need.T * self.stock * self.surface).T
return heating_need
[docs] def size_heater(self, index=None):
if index is None:
levels = ['Housing type', 'Heating system', 'Wall', 'Floor', 'Roof', 'Windows']
index = self.stock.groupby(levels).sum().index
levels_consumption = ['Wall', 'Floor', 'Roof', 'Windows', 'Housing type']
_index = index.to_frame().loc[:, levels_consumption].set_index(levels_consumption).index
_index = _index[~_index.duplicated()]
wall = Series(_index.get_level_values('Wall'), index=_index)
floor = Series(_index.get_level_values('Floor'), index=_index)
roof = Series(_index.get_level_values('Roof'), index=_index)
windows = Series(_index.get_level_values('Windows'), index=_index)
size_heating_system = thermal.size_heating_system(wall, floor, roof, windows, self._ratio_surface.copy())
size_heating_system = reindex_mi(size_heating_system, index)
size_heating_system *= reindex_mi(self._surface, index)
return size_heating_system / 1e3
[docs] def consumption_heating(self, index=None, freq='year', climate=None, smooth=False,
full_output=False, efficiency_hour=False, level_heater='Heating system',
method='5uses', hourly_profile=None, temp_sink=None):
"""Calculation consumption standard of the current building stock [kWh/m2.a].
Parameters
----------
index
freq
climate
smooth
full_output: bool, default False
efficiency_hour
level_heater: {'Heating system', 'Heating system final'}
Returns
-------
"""
# TODO: add when index is not None grouped by levels
if index is None:
levels = ['Housing type', 'Heating system', 'Wall', 'Floor', 'Roof', 'Windows']
index = self.stock.groupby(levels).sum().index
levels_consumption = ['Wall', 'Floor', 'Roof', 'Windows', level_heater, 'Housing type']
_index = index.to_frame().loc[:, levels_consumption].set_index(levels_consumption).index
_index = _index[~_index.duplicated()]
wall = Series(_index.get_level_values('Wall'), index=_index)
floor = Series(_index.get_level_values('Floor'), index=_index)
roof = Series(_index.get_level_values('Roof'), index=_index)
windows = Series(_index.get_level_values('Windows'), index=_index)
heating_system = Series(_index.get_level_values(level_heater), index=_index).astype('object')
efficiency = to_numeric(heating_system.replace(self._efficiency))
consumption = thermal.conventional_heating_final(wall, floor, roof, windows, self._ratio_surface.copy(),
efficiency, climate=climate, freq=freq, smooth=smooth,
efficiency_hour=efficiency_hour, hourly_profile=hourly_profile,
temp_sink=temp_sink)
consumption = reindex_mi(consumption, index)
if full_output is True:
certificate, consumption_3uses = thermal.conventional_energy_3uses(wall, floor, roof, windows,
self._ratio_surface.copy(),
efficiency, _index,
method=method)
certificate = reindex_mi(certificate, index)
consumption_3uses = reindex_mi(consumption_3uses, index)
return consumption, certificate, consumption_3uses
else:
return consumption
[docs] def consumption_heating_store(self, index, level_heater='Heating system', full_output=True):
"""Pre-calculate space energy consumption based only on relevant levels.
Climate should be None. Consumption is stored for later use, so climate cannot change calculation.
Parameters
----------
index: MultiIndex, Index
Used to estimate consumption standard.
level_heater: {'Heating system', 'Heating system final'}, default 'Heating system'
unit
full_output: bool, default True
Returns
-------
"""
levels_consumption = ['Wall', 'Floor', 'Roof', 'Windows', level_heater, 'Housing type']
_index = index.to_frame().loc[:, levels_consumption].set_index(levels_consumption).index
_index = _index[~_index.duplicated()]
_index.rename({level_heater: 'Heating system'}, inplace=True)
# remove index already calculated
if not self._consumption_store['consumption'].empty:
temp = self._consumption_store['consumption'].index.intersection(_index)
idx = _index.drop(temp)
else:
idx = _index
if not idx.empty:
consumption, certificate, consumption_3uses = self.consumption_heating(index=idx, freq='year', climate=None,
full_output=True)
self._consumption_store['consumption'] = concat((self._consumption_store['consumption'], consumption))
self._consumption_store['consumption'].index = MultiIndex.from_tuples(
self._consumption_store['consumption'].index).set_names(consumption.index.names)
self._consumption_store['consumption_3uses'] = concat(
(self._consumption_store['consumption_3uses'], consumption_3uses))
self._consumption_store['consumption_3uses'].index = MultiIndex.from_tuples(
self._consumption_store['consumption_3uses'].index).set_names(consumption.index.names)
self._consumption_store['certificate'] = concat((self._consumption_store['certificate'], certificate))
self._consumption_store['certificate'].index = MultiIndex.from_tuples(
self._consumption_store['certificate'].index).set_names(consumption.index.names)
levels_consumption = [i for i in index.names if i in levels_consumption]
consumption_sd = self._consumption_store['consumption'].loc[_index]
consumption_sd.index.rename({'Heating system': level_heater}, inplace=True)
consumption_sd = consumption_sd.reorder_levels(levels_consumption)
if full_output:
consumption_3uses = self._consumption_store['consumption_3uses'].loc[_index]
consumption_3uses.index.rename({'Heating system': level_heater}, inplace=True)
consumption_3uses = consumption_3uses.reorder_levels(levels_consumption)
certificate = self._consumption_store['certificate'].loc[_index]
certificate.index.rename({'Heating system': level_heater}, inplace=True)
certificate = certificate.reorder_levels(levels_consumption)
return consumption_sd, consumption_3uses, certificate
else:
return consumption_sd
def _to_heating_intensity(self, index, prices, consumption=None, level_heater='Heating system', bill_rebate=0,
full_output=False):
"""Calculate heating intensity of index based on energy prices.
Parameters
----------
index: MultiIndex or Index
prices: Series
consumption: Series, default None
level_heater: {'Heating system', 'Heating system final'}
bill_rebate: float or Series, default 0
Returns
-------
Series
Heating intensity
"""
if consumption is None:
consumption = reindex_mi(self.consumption_heating_store(index, full_output=False), index) * reindex_mi(
self._surface, index)
energy_bill = AgentBuildings.energy_bill(prices, consumption, level_heater=level_heater,
bill_rebate=bill_rebate)
if isinstance(energy_bill, Series):
budget_share = energy_bill / reindex_mi(self._income_tenant, index)
heating_intensity = thermal.heat_intensity(budget_share)
elif isinstance(energy_bill, DataFrame):
budget_share = (energy_bill.T / reindex_mi(self._income_tenant, energy_bill.index)).T
heating_intensity = thermal.heat_intensity(budget_share)
else:
raise NotImplemented
if self.coefficient_global is not None:
heating_intensity *= self.coefficient_global
if not full_output:
return heating_intensity
else:
return heating_intensity, budget_share
[docs] def to_heating_intensity(self, index, prices, consumption=None, level_heater='Heating system', bill_rebate=0,
full_output=False, rho=5):
"""Calculate heating intensity of index based on energy prices.
Parameters
----------
index: MultiIndex or Index
prices: Series
consumption: Series, default None
level_heater: {'Heating system', 'Heating system final'}
bill_rebate: float or Series, default 0
Returns
-------
Series
Heating intensity
"""
if consumption is None:
consumption = reindex_mi(self.consumption_heating_store(index, full_output=False), index) * reindex_mi(
self._surface, index)
energy_bill = self.energy_bill(prices, consumption, level_heater=level_heater,
bill_rebate=bill_rebate)
if isinstance(energy_bill, Series):
budget_share = energy_bill / reindex_mi(self._income_tenant, index)
elif isinstance(energy_bill, DataFrame):
budget_share = (energy_bill.T / reindex_mi(self._income_tenant, energy_bill.index)).T
energy_bill[energy_bill < 0] = 0.1
heating_intensity = energy_bill ** (-1/rho)
#heating_intensity[heating_intensity > self.heating_intensity_max] = self.heating_intensity_max
if self.coefficient_global is not None:
heating_intensity *= self.coefficient_global
if not full_output:
return heating_intensity
else:
return heating_intensity, budget_share
[docs] def to_thermal_comfort(self, heating_intensity, rho=5):
A = self.coefficient_global**rho * abs(self.preferences_insulation['cost']) / 1e3
return A * heating_intensity**(1 - rho) / (1 - rho)
[docs] def consumption_actual(self, prices, consumption=None, full_output=False, bill_rebate=0):
"""Space heating consumption based on standard space heating consumption and heating intensity (kWh/building.a).
Space heating consumption is in kWh/building.y
Equation is based on Allibe (2012).
Parameters
----------
prices: Series
consumption: Series or None, optional
kWh/building.a
full_output: bool, default False
bill_rebate: float, default 0
Returns
-------
Series
"""
if consumption is None:
index = self.stock.index
consumption = self.consumption_heating_store(index, full_output=False)
consumption = reindex_mi(consumption, index) * reindex_mi(self._surface, index)
else:
consumption = consumption.copy()
index = consumption.index
heating_intensity, budget_share = self.to_heating_intensity(index, prices, consumption=consumption,
full_output=True, bill_rebate=bill_rebate)
consumption = consumption * heating_intensity
if full_output is False:
return consumption
else:
return consumption, heating_intensity, budget_share
[docs] def consumption_agg(self, prices=None, freq='year', climate=None, smooth=False,
standard=False, efficiency_hour=False, existing=False, agg='all', bill_rebate=0,
hourly_profile=None):
"""Aggregated final energy consumption (TWh final energy).
Parameters
----------
prices: Series
Energy prices.
freq
climate
smooth
standard: bool, default False
If yes, consumption standard (conventional). Otherwise, consumption actual.
efficiency_hour: bool, default False
existing: bool, default False
If yes, calculate consumption only for existing buildings.
agg: {'all', 'energy', 'heater'}, default 'all'
Aggregation level.
bill_rebate: float, default 0
hourly_profile: DataFrame, optional
Returns
-------
float or Series
"""
if standard is True:
if freq == 'year':
consumption = self.consumption_heating(freq=freq, climate=None)
consumption = reindex_mi(consumption, self.stock.index) * self.surface * self.stock
if existing is True:
consumption = consumption[consumption.index.get_level_values('Existing')]
if agg == 'all':
return consumption.sum() / 10 ** 9
elif agg == 'energy':
energy = self.energy.reindex(consumption.index)
return consumption.groupby(energy).sum() / 10 ** 9
elif agg == 'heater':
return consumption.groupby('Heating system').sum() / 10 ** 9
if standard is False:
if freq == 'year':
# TODO: if climate is none consumption_heating_store ?
consumption = self.consumption_heating(freq=freq, climate=climate, temp_sink=self._temp_sink)
consumption = reindex_mi(consumption, self.stock.index) * self.surface
if existing is True:
consumption = consumption[consumption.index.get_level_values('Existing')]
consumption = self.consumption_actual(prices, consumption=consumption, bill_rebate=bill_rebate) * self.stock
if agg == 'all':
consumption = self.apply_calibration(consumption, agg='energy') / 10 ** 9
consumption = consumption.sum()
else:
consumption = self.apply_calibration(consumption, agg=agg) / 10 ** 9
return consumption
if freq == 'hour':
consumption = self.consumption_heating(freq=freq, climate=climate, smooth=smooth,
efficiency_hour=efficiency_hour, hourly_profile=hourly_profile,
temp_sink=self._temp_sink)
consumption = (reindex_mi(consumption, self.stock.index).T * self.surface).T
heating_intensity = self.to_heating_intensity(consumption.index, prices,
consumption=consumption.sum(axis=1),
bill_rebate=bill_rebate)
consumption = (consumption.T * heating_intensity * self.stock).T
consumption = self.apply_calibration(consumption)
return consumption
[docs] def apply_calibration(self, consumption, level_heater='Heating system', agg='energy'):
if self.coefficient_global is None:
raise AttributeError
if level_heater == 'Heating system final':
if 'Heating system' in consumption.index.names:
consumption = consumption.rename_axis(index={'Heating system': 'Heating system before'})
consumption = consumption.rename_axis(index={'Heating system final': 'Heating system'})
consumption_heater = consumption.groupby('Heating system').sum()
# consumption_heater *= self.coefficient_global, included in heating intensity
if isinstance(consumption, Series):
_consumption_heater = self.coefficient_backup * consumption_heater
consumption_backup = (1 - self.coefficient_backup) * consumption_heater
_consumption_energy = _consumption_heater.groupby(
_consumption_heater.index.str.split('-').str[0].rename('Energy')).sum()
_consumption_energy['Wood fuel'] += consumption_backup.sum()
_consumption_heater['Wood fuel-Performance boiler'] += consumption_backup.sum()
if agg == 'energy':
return _consumption_energy
elif agg == 'heater':
return _consumption_heater
else:
raise ValueError
elif isinstance(consumption, DataFrame):
_consumption_heater = (consumption_heater.T * self.coefficient_backup).T
consumption_backup = (consumption_heater.T * (1 - self.coefficient_backup)).T.sum()
_consumption_energy = _consumption_heater.groupby(
_consumption_heater.index.str.split('-').str[0].rename('Energy')).sum()
_consumption_energy.loc['Wood fuel', :] += consumption_backup
return _consumption_energy
[docs] def calibration_consumption(self, prices, consumption_ini, health_cost_income, health_cost_dpe, climate=None):
"""Calculate energy indicators.
Parameters
----------
prices: Series
consumption_ini: Series
climate
Returns
-------
"""
if self.coefficient_global is None:
consumption, certificate, consumption_3uses = self.consumption_heating(climate=climate, full_output=True)
s = self.stock.groupby(consumption.index.names).sum()
if self.path_ini is not None:
df = concat((consumption_3uses, s), axis=1, keys=['Consumption', 'Stock'])
df = self.add_certificate(df).reset_index('Performance')
make_hist(df, 'Consumption', 'Performance', 'Housing (Million)',
format_y=lambda y, _: '{:.0f}'.format(y / 10 ** 6),
save=os.path.join(self.path_ini, 'consumption_primary_ini.png'),
palette=self._resources_data['colors'],
kde=True)
df = concat((consumption, s), axis=1, keys=['Consumption', 'Stock'])
df = self.add_certificate(df).reset_index('Performance')
make_hist(df, 'Consumption', 'Performance', 'Housing (Million)',
format_y=lambda y, _: '{:.0f}'.format(y / 10 ** 6),
save=os.path.join(self.path_ini, 'consumption_final_ini.png'),
palette=self._resources_data['colors'], kde=True)
consumption = reindex_mi(consumption, self.stock.index) * self.surface
_consumption_actual, heating_intensity, budget_share = self.consumption_actual(prices,
consumption=consumption,
full_output=True)
# Test to calibrate health cost based on heating intensity
if False:
# calibration health_cost on heating intensity
total_health_cost = self.health_cost(health_cost_dpe, health_cost_income, prices,
method_health_cost='epc')
def find_threshold(x, df, total_health_cost):
d = df.loc[df['Heating intensity'] <= float(x), 'Stock']
cost = (d * health_cost_income).sum() / 1e9
return cost - total_health_cost
df = concat((heating_intensity, self.stock), axis=1, keys=['Heating intensity', 'Stock'])
root = fsolve(find_threshold, 0.5, args=(df, total_health_cost), xtol=1e-3)
self.hi_threshold = root[0]
# heating intensity threshold by income, performance group
try:
df = pd.concat((heating_intensity, self.stock), axis=1, keys=['Heating intensity', 'Stock'])
df = df.reset_index('Income tenant')
make_hist(df, 'Heating intensity', 'Income tenant', 'Housing (Million)',
format_y=lambda y, _: '{:.1f}'.format(y / 10 ** 6),
save=os.path.join(self.path_ini, 'heating_intensity_ini.png'),
palette=self._resources_data['colors'], kde=True)
df = pd.concat((budget_share, self.stock), axis=1, keys=['Budget share', 'Stock'])
df = df.reset_index('Income tenant')
make_hist(df, 'Budget share', 'Income tenant', 'Housing (Million)',
format_y=lambda y, _: '{:.0f}'.format(y / 10 ** 6),
save=os.path.join(self.path_ini, 'budget_share_ini.png'),
palette=self._resources_data['colors'], kde=True)
except:
pass
_consumption_actual *= self.stock
consumption_before_calibration = _consumption_actual.copy()
consumption_ini = consumption_ini.groupby('Energy').sum()
# 0. Test before any calibration
consumption_energy = self.add_energy(_consumption_actual)
consumption_energy = consumption_energy.groupby(consumption_ini.index.names).sum()
consumption_energy = consumption_energy[consumption_energy > 0]
test = concat((consumption_ini, consumption_energy / 1e9), axis=1, keys=['Observed', 'Simulated'])
# 1. consumption total
consumption_ini = consumption_ini[consumption_ini > 0]
coefficient_global = consumption_ini.sum() * 10 ** 9 / _consumption_actual.sum()
self.coefficient_global = coefficient_global
_consumption_actual *= self.coefficient_global
heating_intensity = self.to_heating_intensity(_consumption_actual.index, prices)
self.heating_intensity_max = heating_intensity.max()
# 2. coefficient among energies
consumption_energy = self.add_energy(_consumption_actual)
consumption_energy = consumption_energy.groupby(consumption_ini.index.names).sum()
consumption_energy = consumption_energy[consumption_energy > 0]
test = concat((consumption_ini, consumption_energy / 1e9), axis=1, keys=['Observed', 'Simulated'])
coefficient = consumption_ini * 10 ** 9 / consumption_energy
# coefficient[abs(1 - coefficient) < 0.05] = 1
coefficient[coefficient.index.get_level_values('Energy') == 'Wood fuel'] = 1
coefficient[coefficient.index.get_level_values('Energy') == 'Heating'] = 1
# test
_consumption_energy = coefficient * consumption_energy
_consumption_energy['Wood fuel'] += ((1 - coefficient) * consumption_energy).sum()
test = concat((consumption_ini, _consumption_energy / 10 ** 9), axis=1, keys=['Observed', 'Simulated'])
# 3. coefficient energy is the share of the consumption used for other uses
idx_heater = _consumption_actual.index.get_level_values('Heating system').unique()
energy = idx_heater.str.split('-').str[0].rename('Energy')
self.coefficient_backup = coefficient.reindex(energy).set_axis(idx_heater)
self.coefficient_backup['Electricity-Heat pump air'] = 1
self.coefficient_backup['Electricity-Heat pump water'] = 1
self.coefficient_backup['Heating-District heating'] = 1
temp = self.apply_calibration(_consumption_actual)
validation = dict()
# stock initial
temp = concat((self.stock, self.energy), axis=1).groupby(
['Housing type', 'Energy']).sum().iloc[:, 0] / 10 ** 3
temp.index = temp.index.map(lambda x: 'Stock {} {} (Thousands)'.format(x[0], x[1]))
validation.update(temp)
temp = self.stock.groupby('Housing type').sum() / 10 ** 3
temp.index = temp.index.map(lambda x: 'Stock {} (Thousands)'.format(x))
validation.update(temp)
validation.update({'Stock (Thousands)': self.stock.sum() / 10 ** 3})
# surface initial
temp = concat((self.stock * self.surface, self.energy), axis=1).groupby(
['Housing type', 'Energy']).sum().iloc[:, 0] / 10 ** 6
temp.index = temp.index.map(lambda x: 'Surface {} {} (Million m2)'.format(x[0], x[1]))
validation.update(temp)
temp = (self.stock * self.surface).groupby('Housing type').sum() / 10 ** 6
temp.index = temp.index.map(lambda x: 'Surface {} (Million m2)'.format(x))
validation.update(temp)
validation.update({'Surface (Million m2)': (self.stock * self.surface).sum() / 10 ** 6})
# heating consumption initial
temp = concat((_consumption_actual, self.energy), axis=1).groupby(
['Housing type', 'Energy']).sum().iloc[:, 0] / 10 ** 9
temp.index = temp.index.map(lambda x: 'Consumption {} {} (TWh)'.format(x[0], x[1]))
validation.update(temp)
temp = _consumption_actual.groupby('Housing type').sum() / 10 ** 9
temp.index = temp.index.map(lambda x: 'Consumption {} (TWh)'.format(x))
validation.update(temp)
validation.update({'Consumption (TWh)': _consumption_actual.sum() / 10 ** 9})
validation.update({'Coefficient calibration global (%)': self.coefficient_global})
temp = self.coefficient_backup.copy()
temp.index = temp.index.map(lambda x: 'Coefficient calibration {} (%)'.format(x))
validation.update(temp)
temp = consumption_energy / 10 ** 9
temp.index = temp.index.map(lambda x: 'Consumption {} (TWh)'.format(x))
validation.update(temp)
validation = Series(validation)
if self._resources_data['data_calibration'] is not None:
validation = concat((validation, self._resources_data['data_calibration']), keys=['Calcul', 'Data'],
axis=1)
validation['Error'] = (validation['Calcul'] - validation['Data']) / validation['Data']
if self.path_calibration is not None:
validation.round(2).to_csv(os.path.join(self.path_calibration, 'validation_stock.csv'))
[docs] @staticmethod
def energy_bill(prices, consumption, level_heater='Heating system', bill_rebate=0):
"""Calculate energy bill by dwelling for each stock segment (€/dwelling.a).
Parameters
----------
prices: Series or DataFrame or float
Energy prices for year (€/kWh)
consumption: Series
Energy consumption by dwelling (kWh/dwelling.a)
level_heater
Heating system level to calculate the bill. Enable to calculate energy bill before or after change of
heating system.
bill_rebate: float or Series, default 0
Bill rebate (€/dwelling.a)
Returns
-------
Series
Energy bill by dwelling for each stock segment (€/dwelling)
"""
index = consumption.index
heating_system = Series(index.get_level_values(level_heater), index=index)
energy = heating_system.str.split('-').str[0].rename('Energy')
prices = prices.rename('Energy').reindex(energy)
prices.index = index
bill_rebate = reindex_mi(bill_rebate, index).fillna(0)
if isinstance(consumption, Series):
# * reindex_mi(self._surface, index)
return reindex_mi(consumption, index) * prices - bill_rebate
else:
# * reindex_mi(self._surface, index)
return (reindex_mi(consumption, index).T * prices - bill_rebate).T
[docs] def optimal_temperature(self, prices):
"""Find indoor temperature based on energy prices, housing performance and income level.
Parameters
----------
prices: Series
Energy prices.
Returns
-------
"""
def func(temp, consumption, index):
consumption_temp = self.consumption_heating(temp_indoor=temp)
consumption_temp = reindex_mi(consumption_temp, index) * self.surface
return consumption - consumption_temp
consumption_actual = self.consumption_actual(prices)
consumption_sd = self.consumption_heating(temp_indoor=None)
consumption_sd = reindex_mi(consumption_sd, self.stock.index) * self.surface
temp_optimal = {}
for i, v in consumption_actual.iteritems():
temp_optimal.update({i: fsolve(func, 19, args=(consumption_actual.loc[i], i))[0]})
temp_optimal = Series(temp_optimal)
temp = concat((consumption_actual, consumption_sd, temp_optimal), axis=1,
keys=['actual', 'conventional', 'temp optimal'])
return temp_optimal
[docs] def store_consumption(self, prices, carbon_content, bill_rebate=0):
"""Store energy consumption.
Useful to calculate energy saving and rebound effect.
Parameters
----------
prices
carbon_content
bill_rebate
"""
output = dict()
temp = self.consumption_agg(freq='year', standard=True, existing=True, agg='energy')
temp = temp.reindex(prices.index).fillna(0)
output.update({'Consumption standard (TWh)': temp.sum()})
temp.index = temp.index.map(lambda x: 'Consumption standard {} (TWh)'.format(x))
output.update(temp)
temp = self.consumption_agg(prices=prices, freq='year', standard=False, climate=None, smooth=False,
existing=True, agg='energy', bill_rebate=bill_rebate)
temp = temp.reindex(prices.index).fillna(0)
output.update({'Consumption (TWh)': temp.sum()})
emission = (temp * carbon_content).sum() / 10 ** 3
temp.index = temp.index.map(lambda x: 'Consumption {} (TWh)'.format(x))
output.update(temp)
output.update({'Emission (MtCO2)': emission})
return output
[docs]class AgentBuildings(ThermalBuildings):
"""Class AgentBuildings represents thermal dynamic building stock.
Parameters
----------
stock: Series
Building stock.
surface: Series
Surface by dwelling type.
ratio_surface: dict
Losses area of each envelop component
efficiency: Series
Heating system efficiency.
income: Series
Average income value by income class.
consumption_ini: Series
Initial energy consumption by dwelling type.
preferences: dict
Preferences parameters.
performance_insulation: dict
Performance insulation parameters.
path: str, optional
Path to save results.
year: int, default: 2018
Year of the stock.
demolition_rate: float, default 0.0
Demolition rate.
endogenous: bool, default True
Enable to calculate endogenous renovation.
number_exogenous: float or int, default 300000
Number of exogenous renovation.
expected_utility: {'market_share', 'max}
logger: default
calib_scale: bool, default True
full_output: None
quintiles: bool or None, default None
financing_cost: bool, default True
rational_behavior_insulation: bool, default None
rational_behavior_heater: bool, default None
resources_data: dict, default None
detailed_output: bool, default True
figures: dict, default None
Attributes
----------
"""
def __init__(self, stock, surface, ratio_surface, efficiency, income, preferences,
performance_insulation_renovation, lifetime_heater=None, path=None, year=2018,
endogenous=True, exogenous=None, expected_utility=None,
logger=None, calib_scale=True, quintiles=None,
rational_behavior_insulation=None, rational_behavior_heater=None,
resources_data=None, detailed_output=True, figures=None,
method_health_cost=None, residual_rate=0, constraint_heat_pumps=True,
variable_size_heater=True, temp_sink=None, social_discount_rate=0.032,
lifetime_insulation=30, vat_heater=VAT, no_friction=None, belief_engineering_calculation=None
):
super().__init__(stock, surface, ratio_surface, efficiency, income, path=path, year=year,
resources_data=resources_data, detailed_output=detailed_output, figures=figures,
residual_rate=residual_rate, temp_sink=temp_sink)
self._distortion_store = {}
if logger is None:
logger = logging.getLogger()
self.logger = logger
self.memory = {}
self.quintiles = quintiles
if self.quintiles:
self._resources_data = deciles2quintiles_dict(self._resources_data)
self._flow_obligation = {}
self.policies = []
self._target_demolition = ['E', 'F', 'G']
self.vat_heater = vat_heater
choice_insulation = {'Wall': [False, True], 'Floor': [False, True], 'Roof': [False, True],
'Windows': [False, True]}
names = list(choice_insulation.keys())
choice_insulation = list(product(*[i for i in choice_insulation.values()]))
choice_insulation.remove((False, False, False, False))
choice_insulation = MultiIndex.from_tuples(choice_insulation, names=names)
self._choice_insulation = choice_insulation
self._performance_insulation_renovation = performance_insulation_renovation
self.surface_insulation = self._ratio_surface.copy()
self._endogenous, self.param_exogenous = endogenous, exogenous
# method to calculate expected utility
if expected_utility is not None:
self._expected_utility = expected_utility
else:
self._expected_utility = 'market_share'
self.social_discount_rate = social_discount_rate
self.lifetime_insulation = lifetime_insulation
self.preferences_heater = deepcopy(preferences['heater'])
discount_factor = (1 - (1 + self.preferences_heater['present_discount_rate']) ** -self.preferences_heater[
'lifetime']) / self.preferences_heater['present_discount_rate']
self.preferences_heater['discount_factor'] = discount_factor.copy()
self.preferences_heater['bill_saved'] = abs(self.preferences_heater['cost']) * discount_factor
discount_factor_social = (1 - (1 + self.social_discount_rate) ** -self.preferences_heater[
'lifetime']) / self.social_discount_rate
self.preferences_heater['discount_factor_perfect'] = discount_factor_social
self.preferences_heater['bill_saved_perfect'] = abs(self.preferences_heater['cost']) * discount_factor_social
self.preferences_insulation = deepcopy(preferences['insulation'])
discount_factor = (1 - (1 + self.preferences_insulation['present_discount_rate']) ** -self.preferences_insulation[
'lifetime']) / self.preferences_insulation['present_discount_rate']
self.preferences_insulation['discount_factor'] = discount_factor.copy()
self.preferences_insulation['bill_saved'] = abs(self.preferences_insulation['cost']) * discount_factor
discount_factor_social = (1 - (1 + self.social_discount_rate) ** -self.preferences_insulation[
'lifetime']) / self.social_discount_rate
self.preferences_insulation['discount_factor_perfect'] = discount_factor_social
self.preferences_insulation['bill_saved_perfect'] = abs(self.preferences_insulation['cost']) * discount_factor_social
self._calib_scale = calib_scale
self.constant_insulation_extensive, self.constant_insulation_intensive, self.constant_insulation = None, None, None
self.constant_heater = None
self.scale_insulation, self.scale_heater = 1.0, 1.0
self.hidden_cost, self.hidden_cost_insulation, self.hidden_cost_renovation = None, None, None
self.landlord_dilemma, self.multifamily_friction = None, None
self.hidden_cost_heater = None
self.utility_hidden_cost = None
self.number_firms_insulation, self.number_firms_heater = None, None
self._list_condition_subsidies = []
self._stock_ref = self.stock_mobile.copy()
self._replaced_by = None
self._only_heater = None
self._heater_store = {}
self._renovation_store = {}
self._condition_store = None
self._heating_intensity_avg = None
if belief_engineering_calculation is None:
self._belief_engineering_calculation = False
else:
self._belief_engineering_calculation = belief_engineering_calculation
if no_friction is None:
self.no_friction = False
else:
self.no_friction = no_friction
self._variable_size_heater = variable_size_heater
self._constraint_heat_pumps = constraint_heat_pumps
self._markup_insulation_store, self._markup_heater_store = 1, 1
self._annuities_store = {'investment_owner': Series(dtype=float),
'rent_tenant': Series(dtype=float),
'stock_annuities': Series(dtype=float),
}
self.consumption_saving_heater = None
self.consumption_saving_insulation = None
self.cost_rebound = None
self.rebound = None
self.rational_behavior_insulation = rational_behavior_insulation
self.rational_hidden_cost = None
self.rational_behavior_heater = rational_behavior_heater
self.cost_curve_heater = None
self.store_over_years = {year: {}}
self.expenditure_store = {}
self.taxes_revenues = {}
self.bill_rebate = {}
self._balance_state_ini = None
# self.lifetime_heater = lifetime_heater
temp = self.stock.groupby('Heating system').sum()
heater_vintage = dict()
for i in lifetime_heater.index:
if i not in temp.index:
temp.loc[i] = 0
heater_vintage.update({i: Series([temp.loc[i] / lifetime_heater.loc[i]] * lifetime_heater.loc[i],
index=range(1, lifetime_heater.loc[i] + 1))})
self.heater_vintage = DataFrame(heater_vintage).T.rename_axis(index='Heating system', columns='Year')
self.lifetime_heater = lifetime_heater
# 'epc', 'heating_intensity'
if method_health_cost is None:
method_health_cost = 'epc'
self.method_health_cost = method_health_cost
@property
def year(self):
return self._year
@year.setter
def year(self, year):
self._year = year
self._stock_ref = self.stock_mobile.copy()
self._surface = self._surface_yrs.loc[:, year]
self.consumption_before_retrofit = None
self.intensity_before_retrofit = None
self._list_condition_subsidies = []
self._condition_store = None
self.income = self._income_yrs.loc[:, year]
if isinstance(self._efficiency_yrs, DataFrame):
self._efficiency = self._efficiency_yrs.loc[:, year]
if self._temp_sink_yrs is not None:
self._temp_sink = self._temp_sink_yrs.loc[year]
self._replaced_by = None
self._only_heater = None
self._flow_obligation = {}
ini = {
'subsidies_details': {},
'subsidies_count': {},
'subsidies_average': {},
'cost_average': {},
'replacement_eligible': {}
}
for k, item in ini.items():
self._heater_store[k] = item
self.store_over_years.update({self.year: {}})
[docs] def add_flows(self, flows):
"""Update stock attribute by adding flow series.
Parameters
----------
flows: Series, list
"""
flow_total = None
if isinstance(flows, Series):
flow_total = flows
if isinstance(flows, list):
for flow in flows:
if flow_total is None:
flow_total = flow.copy()
else:
union = flow.index.union(flow_total.index)
flow_total = flow.reindex(union, fill_value=0) + flow_total.reindex(union, fill_value=0)
union = flow_total.index.union(self.stock.index)
stock = flow_total.reindex(union, fill_value=0) + self.stock.reindex(union, fill_value=0)
assert (stock >= -ACCURACY).all(), 'Stock Error: Building stock cannot be negative'
# stock[stock < 0] = 0
stock = stock[stock > ACCURACY]
self.stock = stock
[docs] @staticmethod
def add_level(df, ref, level):
"""Add level to df respecting ref distribution.
Parameters
----------
df
ref
level
Returns
-------
"""
df = df.fillna(0)
# to check consistency
df_sum = None
if isinstance(df, Series):
df_sum = df.sum()
elif isinstance(df, DataFrame):
df_sum = df.sum().sum()
share = (ref.unstack(level).T / ref.unstack(level).sum(axis=1)).T
temp = concat([df] * share.shape[1], keys=share.columns, names=share.columns.names, axis=1)
share = reindex_mi(share, temp.columns, axis=1)
share = reindex_mi(share, temp.index)
df = (share * temp).stack(level).dropna()
try:
if isinstance(df, Series):
assert_almost_equal(df.sum(), df_sum, decimal=0)
elif isinstance(df, DataFrame):
assert_almost_equal(df.sum().sum(), df_sum, decimal=0)
except AssertionError:
pass
return df
[docs] def add_attribute(self, df, levels):
if isinstance(levels, str):
levels = [levels]
for lvl in levels:
df = concat([df] * len(self._resources_data['index'][lvl]), keys=self._resources_data['index'][lvl],
names=[lvl],
axis=1).stack(lvl).squeeze()
return df
[docs] @staticmethod
def sum_by_insulation_type(df):
rslt = {}
for i in df.columns.names:
rslt.update({'{} renovated'.format(i): df.xs(True, level=i, drop_level=False, axis=1).sum(axis=1)})
rslt = DataFrame(rslt)
return rslt
[docs] @staticmethod
def to_cumac(consumption_saved, discount=0.035, duration=30):
discount_factor = (1 - (1 + discount) ** -duration) / discount
consumption_saved_cumac = (consumption_saved * discount_factor) / 1000
return consumption_saved_cumac
[docs] def frame_to_flow(self, replaced_by, default_quality=None):
"""Transform insulation transition Dataframe to flow.
Parameters
----------
replaced_by: DataFrame
Returns
-------
replaced_by: Series
Flow Series.
"""
replaced_by_sum = replaced_by.sum().sum()
if 'Income tenant' not in replaced_by.index.names:
share = (self.stock_mobile.unstack('Income tenant').T / self.stock_mobile.unstack('Income tenant').sum(
axis=1)).T
temp = concat([replaced_by] * share.shape[1], keys=share.columns, names=share.columns.names, axis=1)
share = reindex_mi(share, temp.columns, axis=1)
share = reindex_mi(share, temp.index)
replaced_by = (share * temp).stack('Income tenant').dropna()
assert round(replaced_by.sum().sum(), 0) == round(replaced_by_sum, 0), 'Sum problem'
replaced_by = replaced_by.droplevel('Heating system').rename_axis(
index={'Heating system final': 'Heating system'})
replaced_by.index.set_names(
{'Wall': 'Wall before', 'Roof': 'Roof before', 'Floor': 'Floor before', 'Windows': 'Windows before'},
inplace=True)
replaced_by.columns.set_names(
{'Wall': 'Wall after', 'Roof': 'Roof after', 'Floor': 'Floor after', 'Windows': 'Windows after'},
inplace=True)
replaced_by = replaced_by.stack(replaced_by.columns.names).rename('Data')
replaced_by = replaced_by[replaced_by > 0]
replaced_by = replaced_by.reset_index()
performance_insulation = self._performance_insulation_renovation.copy()
if default_quality is not None:
performance_insulation = {k: i * (1 + default_quality) for k, i in performance_insulation.items()}
for component in ['Wall', 'Floor', 'Roof', 'Windows']:
replaced_by[component] = replaced_by['{} before'.format(component)]
replaced_by.loc[replaced_by['{} after'.format(component)], component] = performance_insulation[component]
replaced_by.drop(
['Wall before', 'Wall after', 'Roof before', 'Roof after', 'Floor before', 'Floor after', 'Windows before',
'Windows after'], axis=1, inplace=True)
replaced_by = replaced_by.set_index(self.stock.index.names).loc[:, 'Data']
replaced_by = replaced_by.groupby(replaced_by.index.names).sum()
assert replaced_by.sum().round(0) == replaced_by_sum.round(0), 'Sum problem'
return replaced_by
[docs] def weighted_average(self, df, weights, levels=None, technology='insulation'):
if levels is None:
levels = ['Occupancy status', 'Housing type', 'Performance', 'Energy', 'Income owner']
if 'Energy' in levels:
df = self.add_energy(df)
weights = self.add_energy(weights)
if 'Performance' in levels:
df = self.add_certificate(df)
weights = self.add_certificate(weights)
weights = weights.reorder_levels(df.index.names)
if isinstance(df, DataFrame):
df_grouped = (df.T * weights).T
df_grouped = (df_grouped.groupby(levels).sum().T / weights.groupby(levels).sum()).T
df_grouped.dropna(how='all', inplace=True)
if technology == 'insulation':
names = df_grouped.columns.names
df_grouped.columns = df_grouped.columns.map(lambda row: ", ".join([name for name, value in zip(names, row) if value]))
df_grouped = df_grouped.rename_axis('Technology', axis=1)
df_grouped = df_grouped.stack('Technology').squeeze()
return df_grouped
else:
raise NotImplementedError
[docs] @staticmethod
def find_best_option(criteria, dict_df=None, func='max'):
"""Find best option (columns), and returns
Parameters
----------
criteria: DataFrame
Find for each index the column based on criteria values.
dict_df: dict
Dataframe to return.
func
Returns
-------
DataFrame
"""
if dict_df is None:
dict_df = {}
dict_ds = {key: Series(dtype=float) for key in dict_df.keys()}
dict_ds.update({'criteria': Series(dtype=float)})
columns = None
if func == 'max':
columns = criteria.idxmax(axis=1)
elif func == 'min':
columns = criteria.idxmin(axis=1)
for c in columns.unique():
idx = columns.index[columns == c]
for key in dict_df.keys():
dict_ds[key] = concat((dict_ds[key], dict_df[key].loc[idx, c]), axis=0)
dict_ds['criteria'] = concat((dict_ds['criteria'], criteria.loc[idx, c]), axis=0)
for key in dict_df.keys():
dict_ds[key].index = MultiIndex.from_tuples(dict_ds[key].index).set_names(
dict_df[key].index.names)
dict_ds['criteria'].index = MultiIndex.from_tuples(dict_ds['criteria'].index).set_names(criteria.index.names)
dict_ds['columns'] = columns
return concat(dict_ds, axis=1)
[docs] @staticmethod
def select_deep_renovation(certificate_after):
"""Select deep renovation.
Parameters
----------
certificate_after: DataFrame
Certificate after renovation.
Returns
-------
DataFrame
"""
condition = certificate_after.isin(['A', 'B'])
index = condition.index[(~condition).all(axis=1)]
condition.drop(index, inplace=True)
condition = concat((condition, certificate_after.loc[index, :].isin(['C'])), axis=0)
index = condition.index[(~condition).all(axis=1)]
condition.drop(index, inplace=True)
condition = concat((condition, certificate_after.loc[index, :].isin(['D'])), axis=0)
return condition
[docs] def prepare_consumption(self, choice_insulation=None, performance_insulation=None, index=None, method_epc='5uses',
level_heater='Heating system', full_output=True, store=True, climate=None, default_quality=None):
"""Standard energy consumption and energy performance certificate for each renovation works option.
Standard energy consumption only depends on building characteristics.
Parameters
----------
choice_insulation: MultiIndex
Insulation choice.
performance_insulation: MultiIndex
Insulation performance.
index: MultiIndex
Index to calculate consumption.
method_epc: {'3uses', '5uses'}
Method to calculate energy performance certificate.
level_heater: str
Heating system level to calculate the consumption.
full_output: bool, default True
Enable to return full output.
store: bool, default True
Enable to store results.
climate: int
Climate year.
Returns
-------
DataFrame
Final consumption standard.
DataFrame
Primary consumption standard 3 uses.
DataFrame
Energy performance certificate.
"""
# default option: store = True means that the result is stored in the object
if climate is not None:
store = False
if method_epc == '3uses':
store = False
if index is None:
index = self.stock.index
if not isinstance(choice_insulation, MultiIndex):
choice_insulation = self._choice_insulation
if not isinstance(performance_insulation, MultiIndex):
performance_insulation = self._performance_insulation_renovation
if default_quality is not None:
performance_insulation = {k: i * (1 + default_quality) for k, i in performance_insulation.items()}
# only selecting useful levels as theoretical energy consumption only depends on building characteristics
_index = index.copy()
_index = _index.droplevel(
[i for i in _index.names if i not in ['Housing type', 'Wall', 'Floor', 'Roof', 'Windows'] + [level_heater]])
_index = _index[~_index.duplicated()]
_index = _index.rename({level_heater: 'Heating system'})
_index = _index.reorder_levels(CONSUMPTION_LEVELS)
# remove idx already calculated
if not self._consumption_store['consumption_renovation'].empty and store is True:
temp = self._consumption_store['consumption_renovation'].index.intersection(_index)
idx = _index.drop(temp)
else:
idx = _index
if not idx.empty:
s = concat([Series(index=idx, dtype=float)] * len(choice_insulation), axis=1).set_axis(choice_insulation,
axis=1)
# choice_insulation = choice_insulation.drop(no_insulation) # only for
s.index.rename(
{'Wall': 'Wall before', 'Floor': 'Floor before', 'Roof': 'Roof before', 'Windows': 'Windows before'},
inplace=True)
try:
temp = s.fillna(0).stack(list(s.columns.names))
except ValueError:
self.logger.debug('Problem with pandas version')
# 1. Convert to string to avoid pandas bug
s.columns = pd.MultiIndex.from_tuples(
[tuple(map(str, tup)) for tup in s.columns],
names=s.columns.names
)
# 2. Stack safely
temp = s.fillna(0).stack(level=list(range(s.columns.nlevels))).reset_index()
# 3. Convert back to boolean for semantic clarity
for lvl in s.columns.names:
if temp[lvl].isin(['True', 'False']).all():
temp[lvl] = temp[lvl] == 'True'
temp = temp.reset_index().drop(0, axis=1)
for i in ['Wall', 'Floor', 'Roof', 'Windows']:
# keep the info to unstack later
temp.loc[:, '{} bool'.format(i)] = temp.loc[:, i]
temp.loc[temp[i], i] = performance_insulation[i]
temp.loc[temp[i] == False, i] = temp.loc[temp[i] == False, '{} before'.format(i)]
temp = temp.astype(
{'Housing type': 'string', 'Wall': 'float', 'Floor': 'float', 'Roof': 'float', 'Windows': 'float',
'Heating system': 'string'})
index = MultiIndex.from_frame(temp)
# consumption based on insulated components
if climate is not None or method_epc == '3uses':
consumption, certificate, consumption_3uses = self.consumption_heating(index=index, climate=climate,
level_heater='Heating system',
method=method_epc, full_output=True)
else:
consumption, consumption_3uses, certificate = self.consumption_heating_store(index,
level_heater='Heating system')
rslt = dict()
t = {'consumption': consumption}
if full_output is True:
t.update({'consumption_3uses': consumption_3uses, 'certificate': certificate})
for key, temp in t.items():
temp = reindex_mi(temp, index).droplevel(['Wall', 'Floor', 'Roof', 'Windows']).unstack(
['{} bool'.format(i) for i in ['Wall', 'Floor', 'Roof', 'Windows']])
temp.index.rename({'Wall before': 'Wall', 'Floor before': 'Floor', 'Roof before': 'Roof',
'Windows before': 'Windows'},
inplace=True)
temp.columns.rename(
{'Wall bool': 'Wall', 'Floor bool': 'Floor', 'Roof bool': 'Roof', 'Windows bool': 'Windows'},
inplace=True)
rslt[key] = temp
if store is True:
if self._consumption_store['consumption_renovation'].empty:
self._consumption_store['consumption_renovation'] = rslt['consumption']
self._consumption_store['consumption_3uses_renovation'] = rslt['consumption_3uses']
self._consumption_store['certificate_renovation'] = rslt['certificate']
else:
self._consumption_store['consumption_renovation'] = concat(
(self._consumption_store['consumption_renovation'], rslt['consumption']))
self._consumption_store['consumption_3uses_renovation'] = concat(
(self._consumption_store['consumption_3uses_renovation'], rslt['consumption_3uses']))
self._consumption_store['certificate_renovation'] = concat(
(self._consumption_store['certificate_renovation'], rslt['certificate']))
else:
consumption = rslt['consumption'].loc[_index]
consumption.index.rename({'Heating system': level_heater}, inplace=True)
if full_output is False:
return consumption
else:
consumption_3uses = rslt['consumption_3uses'].loc[_index]
consumption_3uses.index.rename({'Heating system': level_heater}, inplace=True)
certificate = rslt['certificate'].loc[_index]
certificate.index.rename({'Heating system': level_heater}, inplace=True)
return consumption, consumption_3uses, certificate
# formatting using results stored in the object
consumption = self._consumption_store['consumption_renovation'].loc[_index]
consumption.index.rename({'Heating system': level_heater}, inplace=True)
if full_output is True:
consumption_3uses = self._consumption_store['consumption_3uses_renovation'].loc[_index]
consumption_3uses.index.rename({'Heating system': level_heater}, inplace=True)
certificate = self._consumption_store['certificate_renovation'].loc[_index]
certificate.index.rename({'Heating system': level_heater}, inplace=True)
return consumption, consumption_3uses, certificate
else:
return consumption
[docs] def calculate_financing(self, cost, subsidies, financing_cost, policies=None):
"""Apply financing cost.
Parameters
----------
cost
subsidies
financing_cost
policies: list
Returns
-------
cost_total
cost_financing
amount_debt
amount_saving
discount
subsidies
"""
cost_total, cost_financing, amount_debt, amount_saving, discount = cost, None, None, None, None
to_pay = cost - subsidies
financing_options = {'debt': {'price': financing_cost['interest_rate'].loc[self.year],
'max': None,
'duration': financing_cost['duration'],
'type': 'debt'
},
'saving': {'price': financing_cost['saving_rate'].loc[self.year],
'max': financing_cost['upfront_max'],
'duration': financing_cost['duration'],
'type': 'saving'
},
}
# only work with one zero-interest-loan
if policies:
for policy in policies:
if isinstance(policy.value, dict):
value = policy.value[self.year]
else:
value = policy.value
if value is None:
# Meaning we are assessing the zero_interest_loan
value = financing_cost['interest_rate'].loc[self.year] - 1e-4
if isinstance(policy.cost_max, dict):
cost_max = policy.cost_max[self.year]
else:
cost_max = policy.cost_max
# Only when policy.target otherwise cost_max = 0
if policy.target is not None:
cost_max = cost_max * policy.target
financing_options.update({policy.name: {'price': value,
'max': cost_max,
'duration': policy.duration,
'type': 'debt'}})
# sort financing_options by price
financing_options = {k: v for k, v in sorted(financing_options.items(), key=lambda item: item[1]['price'])}
if financing_cost is not None:
remaining = to_pay.copy()
rslt = {}
cost_financing, debt_reimbursement = DataFrame(0, index=to_pay.index, columns=to_pay.columns), DataFrame(0, index=to_pay.index, columns=to_pay.columns)
for key, item in financing_options.items():
if item['max'] is not None:
c_max = reindex_mi(item['max'], remaining.index)
if isinstance(c_max, Series):
c_max = concat([c_max] * remaining.shape[1], keys=to_pay.columns, axis=1)
amount = remaining.where(remaining < c_max, c_max)
else:
amount = remaining
if item['type'] == 'debt':
if item['price'] == 0:
reimbursement = amount / item['duration']
else:
reimbursement = calculate_annuities(amount, lifetime=item['duration'], discount_rate=item['price'])
debt_reimbursement += reimbursement
# could be better calculated
c = (calculate_annuities(amount, lifetime=item['duration'], discount_rate=item['price']) * item['duration'] - amount)
cost_financing += c
rslt.update({key: amount.copy()})
remaining -= amount
assert remaining.sum().sum().round(0) == 0, 'Not all cost have been financed'
assert allclose(sum(rslt.values()), to_pay), 'Not all cost have been financed'
share = {k: i / to_pay for k, i in rslt.items()}
share = {k: i.fillna(0) for k, i in share.items()}
# assert allclose(sum(share.values()).round(2), DataFrame(1, index=to_pay.index, columns=to_pay.columns)), 'Share problem'
discount = sum([share[k] * financing_options[k]['price'] for k in rslt.keys()])
amount_debt = sum([rslt[k] for k in financing_options.keys() if financing_options[k]['type'] == 'debt'])
amount_saving = sum([rslt[k] for k in financing_options.keys() if financing_options[k]['type'] == 'saving'])
assert allclose(amount_debt + amount_saving, to_pay), 'Not all cost have been financed'
cost_total = cost + cost_financing
subsidies = dict()
if policies:
for policy in policies:
subsidies.update({policy.name: rslt[policy.name] * policy.public_cost * financing_options[policy.name]['duration']})
return cost_total, cost_financing, amount_debt, amount_saving, discount, subsidies
[docs] def credit_constraint(self, amount_debt, financing_cost, bill_saved=None):
condition = DataFrame(True, index=amount_debt.index, columns=amount_debt.columns)
sub = None
if financing_cost['debt_income_ratio']:
debt_reimbursement = calculate_annuities(amount_debt, lifetime=financing_cost['duration'],
discount_rate=financing_cost['interest_rate'].loc[self.year])
debt_income_ratio = (debt_reimbursement.T / reindex_mi(self._income_owner, amount_debt.index)).T
condition = condition & (debt_income_ratio <= financing_cost['debt_income_ratio'])
factor = factor_annuities(lifetime=financing_cost['duration'],
discount_rate=financing_cost['interest_rate'].loc[self.year])
sub = (amount_debt.T - reindex_mi(self._income_owner * financing_cost['debt_income_ratio'] / factor, amount_debt.index)).T
sub = sub.where(sub > 0, 0)
if bill_saved is not None:
condition = condition & (debt_reimbursement <= bill_saved)
return condition
[docs] def apply_subsidies_heater(self, index, policies_heater, cost_heater, consumption_saved, emission_saved,
bill_saved):
"""Calculate subsidies for each dwelling and each heating system.
Parameters
----------
index: Index or MultiIndex
policies_heater: list
cost_heater: DataFrame
consumption_saved: DataFrame
emission_saved: DataFrame
Returns
-------
"""
subsidies_details = {}
vat_base = self.vat_heater
if isinstance(vat_base, Series):
vat_base = vat_base.reindex(cost_heater.columns)
p = [p for p in policies_heater if 'tax_status_quo' == p.policy]
if p:
value = self.preferences_heater['status_quo_bias'] / abs(self.preferences_heater['cost']) * 1000
value = Series(value, index=Index(['Natural gas-Performance boiler'], name='Heating system final'))
value = value.reindex(cost_heater.columns).fillna(0)
t = ((cost_heater + value) / cost_heater).mean() - 1
vat_base += t.reindex(vat_base.index).fillna(0)
p = [p for p in policies_heater if 'reduced_vat' == p.policy]
if p:
vat = p[0].value
if isinstance(vat, dict):
vat = vat[self.year]
sub = cost_heater * (vat_base - vat)
subsidies_details.update({'reduced_vat': sub.copy()})
else:
vat = vat_base
vat_heater = cost_heater * vat
cost_heater += vat_heater
policies_incentive = ['subsidy_ad_valorem', 'subsidy_target', 'subsidy_proportional', 'subsidies_cap',
'subsidy_present_bias']
self.policies += [i.name for i in policies_heater if i.policy in policies_incentive and i.name not in self.policies]
cost_heater.sort_index(inplace=True)
for policy in [p for p in policies_heater if p.policy in ['subsidy_ad_valorem', 'subsidy_target', 'subsidy_proportional',
'subsidy_present_bias', 'subsidy_status_quo']]:
if isinstance(policy.value, dict):
value = policy.value[self.year]
else:
value = policy.value
if policy.policy == 'subsidy_target':
value = value.stack()
value = value[value.index.get_level_values('Heating system final').isin(cost_heater.columns)]
value = reindex_mi(value, cost_heater.stack().index).unstack('Heating system final').fillna(0)
elif policy.policy == 'subsidy_present_bias':
value = bill_saved.copy()
value[value < 0] = 0
value = (reindex_mi((self.preferences_heater['discount_factor_perfect'] -
self.preferences_heater['discount_factor']), value.index) * value.T).T
elif policy.policy == 'subsidy_status_quo':
v = self.preferences_heater['status_quo_bias'] / abs(self.preferences_heater['cost']) * 1000
value = pd.DataFrame(0, index=cost_heater.index, columns=cost_heater.columns)
value.loc[value.index.get_level_values('Heating system').isin(['Natural gas-Performance boiler', 'Oil fuel-Performance boiler']), 'Electricity-Heat pump water'] = v
elif policy.policy == 'subsidy_ad_valorem':
if isinstance(policy.value, dict):
value = policy.value[self.year]
else:
value = policy.value
if isinstance(value, (float, int)):
value = value * cost_heater
elif isinstance(value, DataFrame):
value = value.stack()
value = value[value.index.get_level_values('Heating system final').isin(cost_heater.columns)]
value = (cost_heater.stack() * value).unstack('Heating system final').fillna(0)
elif isinstance(value, Series):
raise NotImplemented('Error by should be index or columns')
elif policy.policy == 'subsidy_proportional':
# reindex_mi(cost_insulation, index) / consumption_saved_cumac
if policy.proportional == 'tCO2_cumac':
emission_saved_cumac = self.to_cumac(emission_saved, discount=self.social_discount_rate,
duration=self.lifetime_heater) / 10**3
emission_saved_cumac[emission_saved_cumac < 0] = 0
value = value * emission_saved_cumac
value = value.fillna(0).loc[:, emission_saved_cumac.columns]
elif policy.proportional == 'MWh_cumac':
consumption_saved_cumac = self.to_cumac(consumption_saved)
consumption_saved_cumac[consumption_saved_cumac < 0] = 0
value = value * consumption_saved_cumac
else:
raise NotImplemented
else:
raise NotImplemented
assert (value >= 0).all().all(), 'Subsidies need to be negative'
# cap for all policies
value.fillna(0, inplace=True)
value = value.loc[:, [i for i in cost_heater.columns if i in value.columns]]
value = value.reorder_levels(cost_heater.index.names)
value.sort_index(inplace=True)
value = value.where(value <= cost_heater, cost_heater)
if policy.cap:
value = value.where(value <= policy.cap, policy.cap)
subsidies_details[policy.name] = value.copy()
subsidies_bonus = [p for p in policies_heater if p.policy == 'bonus']
for policy in subsidies_bonus:
if isinstance(policy.value, dict):
value = policy.value[self.year]
else:
value = policy.value
value = reindex_mi(value, index)
value = value.reindex(cost_heater.columns, axis=1).fillna(0)
if policy.name in subsidies_details.keys():
subsidies_details[policy.name] = subsidies_details[policy.name] + value
else:
subsidies_details[policy.name] = value
subsidies_total = [subsidies_details[k] for k in subsidies_details.keys() if
k not in ['reduced_vat', 'over_cap']]
if subsidies_total:
subsidies_total = sum(subsidies_total)
else:
subsidies_total = pd.DataFrame(0, index=index, columns=cost_heater.columns)
# store eligible before cap
eligible = {}
for policy in subsidies_details:
eligible.update({policy: (subsidies_details[policy] > 0).any(axis=1)})
# overall cap for cumulated amount of subsidies
subsidies_cap = [p for p in policies_heater if p.policy == 'subsidies_cap']
if subsidies_cap:
# only one subsidy cap
subsidies_cap = subsidies_cap[0]
policies_target = subsidies_cap.target
subsidies_cap = reindex_mi(subsidies_cap.value, subsidies_total.index)
cap = (reindex_mi(cost_heater, index).T * subsidies_cap).T
over_cap = subsidies_total > cap
subsidies_details['over_cap'] = (subsidies_total - cap)[over_cap].fillna(0)
subsidies_details['over_cap'].sort_index(inplace=True)
remaining = subsidies_details['over_cap'].copy()
for policy_name in policies_target:
if policy_name in subsidies_details.keys():
self.logger.debug('Capping amount for {}'.format(policy_name))
temp = remaining.where(remaining <= subsidies_details[policy_name], subsidies_details[policy_name])
subsidies_details[policy_name] -= temp
assert (subsidies_details[policy_name].values >= 0).all(), '{} got negative values'.format(
policy_name)
remaining -= temp
if allclose(remaining, 0, rtol=10 ** -1):
break
assert allclose(remaining, 0, rtol=10 ** -1), 'Over cap'
subsidies_total -= subsidies_details['over_cap']
regulation = [p for p in policies_heater if p.policy == 'regulation']
if 'status_quo_bias' in [p.name for p in regulation]:
self.preferences_heater['status_quo_bias'] = 0
if 'present_bias' in [p.name for p in regulation]:
v = [p for p in regulation if p.name == 'present_bias'][0].value
discount_factor = (1 - (1 + v) ** -self.preferences_heater['lifetime']) / v
idx = self.preferences_heater['bill_saved'].index
self.preferences_heater['bill_saved'] = Series(discount_factor * abs(self.preferences_heater['cost']),
index=idx)
return cost_heater, vat_heater, subsidies_details, subsidies_total, eligible
[docs] def calculate_distortion_heater(self, stock, cost_heater, emission_saved, carbon_value, bill_saved, cap=True):
distortion_details = {}
# 1. distortion present-bias
if False:
value = bill_saved.copy()
value[value < 0] = 0
value = (reindex_mi((self.preferences_heater['discount_factor_perfect'] -
self.preferences_heater['discount_factor']), value.index) * value.T).T
value.fillna(0, inplace=True)
distortion_details.update({'bill_saved': value.copy()})
# 2. distortion emission
emission_saved_cumac = self.to_cumac(emission_saved, discount=self.social_discount_rate,
duration=self.lifetime_heater) / 10 ** 3
emission_saved_cumac[emission_saved_cumac < 0] = 0
value = carbon_value * emission_saved_cumac
value = value.fillna(0).loc[:, emission_saved.columns]
distortion_details.update({'emission_saved': value.copy()})
# 3. distortion status quo
if False:
value = pd.DataFrame(0, index=cost_heater.index, columns=cost_heater.columns)
for hs in value.columns:
value.loc[
value.index.get_level_values('Heating system') == hs, hs] = - self.preferences_heater['status_quo_bias'] / abs(self.preferences_heater['cost']) * 1000
distortion_details.update({'status_quo': value.copy()})
distortion_total = sum([i for k, i in distortion_details.items()])
hs_to_drop = ['Heating-District heating', 'Oil fuel-Performance boiler']
distortion_total.drop(hs_to_drop, inplace=True, axis=1)
distortion_total = (distortion_total.T - distortion_total.min(axis=1)).T
if cap is True:
distortion_total = distortion_total.loc[:, [i for i in cost_heater.columns if i in distortion_total.columns]]
c = cost_heater.loc[:, [i for i in cost_heater.columns if i in distortion_total.columns]]
distortion_total = distortion_total.where(distortion_total <= c, c)
distortion_total[distortion_total < 0] = 0
levels = ['Occupancy status', 'Housing type', 'Performance', 'Energy', 'Income owner']
distortion_grouped = self.weighted_average(distortion_total, stock.copy(), levels=levels, technology='heater')
return distortion_grouped
[docs] def endogenous_market_share_heater(self, index, _bill_saved, _subsidies_total, _cost_heater, calib_heater=None,
_cost_financing=None, condition=None, flow_replace=None, drop_heater=None,
_credit_constraint=None):
def test_sensitivity_hp(_utility_cost, _utility_bill_saving, _utility_financing, _utility_inertia, _condition,
_flow_replace, _cost_heater):
# assessing sensitivity to subsidies_heat_pump
_utility = _utility_cost + _utility_bill_saving + _utility_financing + _utility_inertia
_utility_constant = reindex_mi(self.constant_heater.reindex(_utility.columns, axis=1), _utility.index)
_utility += _utility_constant
c = _cost_heater.copy()
c.loc[:, [i for i in _cost_heater.columns if i != 'Electricity-Heat pump water']] = 0
dict_rslt = dict()
rslt = dict()
for sub in range(0, 110, 10):
sub /= 100
u_sub = c * sub * self.preferences_heater['subsidy'] / 1000
u = _utility + u_sub
if _condition is not None:
_condition = _condition.astype(float)
_condition = _condition.replace(0, float('nan'))
u *= _condition
_market_share = (exp(u).T / exp(u).sum(axis=1)).T
temp = (_flow_replace * _market_share.T).T
rslt.update({sub: temp.loc[:, 'Electricity-Heat pump water'].sum()})
dict_rslt.update({'With inertia': Series(rslt) / 10 ** 3})
_utility -= _utility_inertia
rslt = dict()
for sub in range(0, 110, 10):
sub /= 100
u_sub = c * sub * self.preferences_heater['subsidy'] / 1000
u = _utility + u_sub
if _condition is not None:
_condition = _condition.astype(float)
_condition = _condition.replace(0, float('nan'))
u *= _condition
_market_share = (exp(u).T / exp(u).sum(axis=1)).T
temp = (_flow_replace * _market_share.T).T
rslt.update({sub: temp.loc[:, 'Electricity-Heat pump water'].sum()})
dict_rslt.update({'Without inertia': Series(rslt) / 10 ** 3})
make_plots(dict_rslt, 'Heat pumps function of ad valorem subsidy (Thousand per year)',
save=os.path.join(self.path_calibration, 'sensi_hp.png'), integer=False, legend=True)
def calibration_heater(_utility, _calib_heater, _utility_shock, _flow_replace):
def calibration(x, _ms, _utility_ini, _flow, _idx, _ref, _u_shock, _target,
_option='price_elasticity'):
if _option is not None:
_scale = abs(x[0])
else:
_scale = 1
cst = pd.Series(x[1:], index=_idx)
cst = concat((cst, Series(0, index=_ref)))
cst = cst.unstack('Heating system final')
_u = _utility_ini * _scale + cst
_market_share = (exp(_u).T / exp(_u).sum(axis=1)).T
_agg = (_market_share.T * _flow).T.groupby(cst.index.names).sum()
_market_share_agg = (_agg.T / _agg.sum(axis=1)).T
_market_share_agg = _market_share_agg.stack()
_market_share_agg = _market_share_agg[_market_share_agg > 0]
_rslt = _market_share_agg - _ms
_rslt.drop(_ref, inplace=True)
_rslt = _rslt.loc[_idx].to_numpy()
if _option == 'shock':
_temp = _u + _u_shock * _scale
# temp = _utility_ini + _u_shock
_market_share = (exp(_temp).T / exp(_temp).sum(axis=1)).T
_agg = (_market_share.T * _flow).T
heat_pump = _agg.loc[:, 'Electricity-Heat pump water'].sum() / _flow.sum()
_rslt = append(heat_pump - _target, _rslt)
elif _option == 'price_elasticity':
heater_ref = 'Electricity-Heat pump water'
_price_elasticity = self.preferences_heater['cost'] * _scale * _cost_heater.loc[:, heater_ref].iloc[0] / 1000 * (1 - _market_share.loc[:, heater_ref])
_price_elasticity_average = (_flow * _price_elasticity).sum() / _flow.sum()
_rslt = append(_price_elasticity_average - _target, _rslt)
else:
_rslt = append(0, _rslt)
return _rslt
# target
_ms_heater = _calib_heater['ms_heater']
option = _calib_heater['scale']['option']
target = _calib_heater['scale']['target']
# simplifying market-share
simplify = True
if simplify:
flow = _flow_replace.groupby(_ms_heater.index.names).sum()
ms = (flow * _ms_heater.T).T.groupby('Housing type').sum()
ms = (ms.T / ms.sum(axis=1)).T
ms = ms.stack()
ms = ms[ms > 0]
else:
ms = _ms_heater.copy()
ref = MultiIndex.from_tuples([('Multi-family', 'Electricity-Direct electric'), ('Single-family', 'Electricity-Direct electric')], names=['Housing type', 'Heating system final'])
x0 = pd.Series(0, index=ms.index).drop(ref)
idx = x0.index
x0 = x0.copy().to_numpy()
x0 = append(1, x0)
root, info_dict, ier, mess = fsolve(calibration, x0, args=(ms, _utility.copy(), _flow_replace, idx, ref,
_utility_shock, target, option), full_output=True)
if ier == 1:
self.logger.debug('Calibration investment decision heater worked')
else:
raise ValueError('Calibration investment decision heater did not work')
scale = abs(root[0])
constant = concat((Series(root[1:], index=idx), Series(0, index=ref))).unstack('Heating system final')
ms = ms.unstack('Heating system final')
self.constant_heater = constant.copy()
self.apply_scale(scale, gest='heater')
# no calibration
temp = (exp(_utility).T / exp(_utility).sum(axis=1)).T
agg = (temp.T * _flow_replace).T.groupby(ms.index.names).sum()
ms_no_calibration = (agg.T / agg.sum(axis=1)).T
# with shock
temp_shock = (exp(_utility + _utility_shock).T / exp(_utility + _utility_shock).sum(axis=1)).T
agg_shock = (temp_shock.T * _flow_replace).T.groupby(ms.index.names).sum()
ms_no_calibration_agg_shock = (agg_shock.T / agg_shock.sum(axis=1)).T
temp = (exp(_utility * scale + constant).T / exp(_utility * scale + constant).sum(axis=1)).T
assert (temp.sum(axis=1).round(1) == 1.0).all(), 'Market-share issue'
agg = (temp.T * _flow_replace).T.groupby(ms.index.names).sum()
market_share_agg = (agg.T / agg.sum(axis=1)).T
# with shock
temp_shock = (exp((_utility + _utility_shock) * scale + constant).T / exp((_utility + _utility_shock) * scale + constant).sum(axis=1)).T
assert (temp.sum(axis=1).round(1) == 1.0).all(), 'Market-share issue'
agg_shock = (temp_shock.T * _flow_replace).T.groupby(ms.index.names).sum()
market_share_agg_shock = (agg_shock.T / agg_shock.sum(axis=1)).T
wtp = constant / self.preferences_heater['cost']
hidden_benefits = constant / abs(self.preferences_heater['cost'])
details = concat((constant.stack(), ms_no_calibration.stack(), market_share_agg.stack(), ms.stack(),
agg.stack() / 10 ** 3, wtp.stack(), hidden_benefits.stack()),
axis=1, keys=['constant', 'no_calibration', 'calcul', 'observed', 'thousand', 'wtp',
'hidden_benefits']).round(decimals=3)
if self.path_calibration is not None:
details.to_csv(os.path.join(self.path_calibration, 'calibration_constant_heater.csv'))
# export utility coefficient
cost = Series(self.preferences_heater['cost'])
cost = cost.reset_index().set_axis(['Attribute', 'Value'], axis=1)
preference_benefits = self.preferences_heater['bill_saved']
preference_benefits = preference_benefits.reset_index().set_axis(['Attribute', 'Value'], axis=1)
status_quo = Series(self.preferences_heater['status_quo_bias'])
status_quo = status_quo.reset_index().set_axis(['Attribute', 'Value'], axis=1)
constant_heater = self.constant_heater.stack().copy()
constant_heater.index = constant_heater.index.map(lambda x: '{} | {}'.format(x[0], x[1]))
constant_heater = constant_heater.reset_index().set_axis(['Attribute', 'Value'], axis=1)
temp = {'Cost': cost,
'Benefits': preference_benefits,
'Status quo': status_quo,
'Constant measure': constant_heater}
temp = concat(temp, axis=0).droplevel(-1)
temp.to_csv(os.path.join(self.path_calibration, 'coefficient_utility_heater.csv'))
return constant
_bill_saved, _subsidies_total, _cost_heater = _bill_saved.loc[index, :], _subsidies_total.loc[index, :], _cost_heater.loc[index, :]
choice_heater = _cost_heater.columns
if drop_heater:
choice_heater = [i for i in choice_heater if i not in drop_heater]
if condition is not None:
condition = condition.loc[index, choice_heater]
utility_bill_saving = (_bill_saved.T * reindex_mi(self.preferences_heater['bill_saved'],
_bill_saved.index)).T / 1000
utility_bill_saving = utility_bill_saving.loc[:, choice_heater]
utility_subsidies = _subsidies_total * self.preferences_heater['subsidy'] / 1000
_cost_heater = _cost_heater.reindex(index).reindex(choice_heater, axis=1)
pref_investment = reindex_mi(self.preferences_heater['cost'], index)
utility_cost = (pref_investment * _cost_heater.T).T / 1000
utility_inertia = DataFrame(0, index=utility_bill_saving.index, columns=utility_bill_saving.columns)
for hs in choice_heater:
utility_inertia.loc[
utility_inertia.index.get_level_values('Heating system') == hs, hs] = self.preferences_heater['status_quo_bias']
utility = utility_inertia + utility_cost + utility_bill_saving + utility_subsidies
if _cost_financing is not None:
_cost_financing = _cost_financing.loc[index, :]
pref_financing = reindex_mi(self.preferences_heater['cost'], _cost_financing.index).rename(None)
utility_financing = (_cost_financing.T * pref_financing).T / 1000
utility += utility_financing
if condition is not None:
condition = condition.loc[index, :]
condition = condition.astype(float)
condition = condition.replace(0, float('nan'))
utility *= condition
utility.dropna(how='all', axis=1, inplace=True)
if _credit_constraint is not None:
_credit_constraint = _credit_constraint.loc[index, utility.columns]
_credit_constraint = _credit_constraint.astype(float)
_credit_constraint = _credit_constraint.replace(0, float('nan'))
utility_constraint = utility * _credit_constraint
# there are cases where hh cannot afford any investment, we therefore let them keep the same heating system
temp = utility_constraint.loc[utility_constraint.isna().all(axis=1), :]
t = DataFrame(index=temp.index, columns=temp.columns, dtype=float)
for i in temp.index.get_level_values('Heating system').unique():
idx = temp[temp.index.get_level_values('Heating system') == i].index
t.loc[idx, i] = utility.loc[idx, i]
# u = utility.copy()
utility *= _credit_constraint
utility.dropna(how='all', inplace=True)
utility = concat((utility, t), axis=0)
utility = utility.loc[index, :]
if (self.constant_heater is None) and (calib_heater is not None):
self.logger.info('Calibration market-share heating system')
sub_shock = - utility_cost - utility_financing
sub_shock.loc[:, [i for i in sub_shock.columns if i != 'Electricity-Heat pump water']] = 0
utility_shock = - utility_subsidies + sub_shock
utility_shock = utility_shock.loc[index, :]
if condition is not None:
utility_shock *= condition
utility_shock.dropna(how='all', axis=1, inplace=True)
# ms_heater.dropna(how='all', inplace=True)
calib_heater['ms_heater'] = calib_heater['ms_heater'].loc[:, utility.columns]
calibration_heater(utility, calib_heater, utility_shock, flow_replace)
utility_cost *= self.scale_heater
utility_bill_saving *= self.scale_heater
utility_financing *= self.scale_heater
utility_inertia *= self.scale_heater
if self.path_calibration:
test_sensitivity_hp(utility_cost, utility_bill_saving, utility_financing, utility_inertia, condition,
flow_replace, _cost_heater)
utility *= self.scale_heater
utility_constant = reindex_mi(self.constant_heater.reindex(utility.columns, axis=1), utility.index)
utility += utility_constant
# x = (log(exp(utility).sum(axis=1)) - utility.T).T
error = (log(exp(utility).sum(axis=1)) + - utility.T).T
_unobserved_value = error + utility_constant
market_share = (exp(utility).T / exp(utility).sum(axis=1)).T
return market_share, _unobserved_value
[docs] def exogenous_market_share_heater(self, index, choice_heater_idx):
"""Define exogenous market-share.
Market-share is defined by _market_share_exogenous attribute.
Parameters
----------
index: MultiIndex or Index
choice_heater_idx: Index
Returns
-------
DataFrame
Market-share by segment and possible heater choice.
Series
Probability replacement.
"""
market_share_exogenous = {'Wood fuel-Standard boiler': 'Wood fuel-Performance boiler',
'Wood fuel-Performance boiler': 'Wood fuel-Performance boiler',
'Oil fuel-Standard boiler': 'Electricity-Heat pump water',
'Oil fuel-Performance boiler': 'Electricity-Heat pump water',
'Natural gas-Standard boiler': 'Natural gas-Performance boiler',
'Natural gas-Performance boiler': 'Natural gas-Performance boiler',
'Electricity-Direct electric': 'Electricity-Heat pump air',
'Electricity-Heat pump air': 'Electricity-Heat pump air',
'Electricity-Heat pump water': 'Electricity-Heat pump water',
}
market_share = Series(index=index, dtype=float).to_frame().dot(
Series(index=choice_heater_idx, dtype=float).to_frame().T)
for initial, final in market_share_exogenous.items():
market_share.loc[market_share.index.get_level_values('Heating system') == initial, final] = 1
temp = Series(0, index=index, dtype='float').to_frame().dot(
Series(0, index=choice_heater_idx, dtype='float').to_frame().T)
index_final = temp.stack().index
_, _, certificate = self.consumption_heating_store(index_final, level_heater='Heating system final')
certificate = reindex_mi(certificate.unstack('Heating system final'), index)
certificate_before = self.consumption_heating_store(index)[2]
certificate_before = reindex_mi(certificate_before, index)
self._heater_store['epc_upgrade'] = - certificate.replace(EPC2INT).sub(
certificate_before.replace(EPC2INT), axis=0)
return market_share
[docs] def heater_replacement(self, stock, prices, cost_heater, policies_heater, calib_heater=None,
step=1, financing_cost=None, district_heating=None, premature_replacement=None,
prices_before=None, supply=None, store_information=True, bill_rebate=0,
carbon_content=None, carbon_value=None, credit_constraint=True):
"""Function returns building stock updated after switching heating system.
Parameters
----------
stock: Series
prices: Series
cost_heater: Series
calib_heater: DataFrame, optional
policies_heater: list
calib_heater: DataFrame, optional
step: int, optional
financing_cost: dict, optional
district_heating: Series, optional
premature_replacement: Series, optional
prices_before: Series, optional
supply: Series, optional
store_information: bool, optional
carbon_content: Series, optional
carbon_value: float, optional
bill_rebate: float, optional
Returns
-------
Series
"""
def apply_rational_choice(_consumption_saved, _subsidies_total, _cost_total, _bill_saved, _carbon_saved,
_stock, social=False, discount_social=0.032):
if social:
_discount = discount_social
# subsidies do not change market-share
_bill_saved[_bill_saved == 0] = float('nan')
ratio = (_cost_total - _subsidies_total) / _bill_saved
if social:
ratio = (_cost_total - _subsidies_total) / (_bill_saved + _carbon_saved)
best_option = AgentBuildings.find_best_option(ratio, {'consumption_saved': _consumption_saved}, func='min')
_market_share = DataFrame(0, index=ratio.index, columns=ratio.columns)
for i in _market_share.index:
_market_share.loc[i, best_option.loc[i, 'columns']] = 1
assert (_market_share.sum(axis=1) == 1).all(), 'Market-share issue'
return _market_share
index = stock.index
cost_heater_raw = cost_heater.copy()
probability = self.heater_vintage.loc[:, 1] / self.heater_vintage.sum(axis=1)
probability.dropna(inplace=True)
probability *= step
# premature replacement
premature_heater = [p for p in policies_heater if p.policy == 'premature_heater']
for premature in premature_heater:
print('Premature replacement not Implemented')
# temp = [i for i in probability.index if '{}'.format(i.split('-')[0]) in premature.target]
# probability.loc[temp] = premature.value
flow_replace = stock * reindex_mi(probability, stock.index)
# condition
condition = Series(True, index=index, dtype='float').to_frame().dot(Series(True, index=cost_heater.index).to_frame().T)
condition = condition.astype(bool)
# policies restriction
restriction = [p for p in policies_heater if p.policy in ['restriction_energy', 'restriction_heater']]
for policy in restriction:
if isinstance(policy.value, str):
temp = [policy.value]
else:
temp = policy.value
if policy.policy == 'restriction_energy':
temp = [i for i in cost_heater.index if '{}'.format(i.split('-')[0]) in temp]
if temp:
idx = condition.index
if policy.target is not None:
idx = condition.index.get_level_values('Housing type') == policy.target
condition.loc[idx, temp] = False
# size heating system
if self._variable_size_heater:
size_heater = self.size_heater(index=index)
else:
size_heater = Series(8, index=index)
# technical restriction - removing heat-pump for low-efficient buildings
condition.columns.names = ['Heating system final']
if self._constraint_heat_pumps:
if isinstance(self._constraint_heat_pumps, list):
_, certificate, _ = self.consumption_heating(method='3uses', full_output=True)
condition = concat((condition, reindex_mi(certificate.rename('Performance'), condition.index)), axis=1)
condition = condition.set_index('Performance', append=True)
idx = (condition.index.get_level_values('Performance').isin(['F', 'G'])) & (
~condition.index.get_level_values('Heating system').isin(self._resources_data['index']['Heat pumps']))
condition.loc[idx, [i for i in self._resources_data['index']['Heat pumps'] if i in condition.columns]] = False
condition = condition.droplevel('Performance')
else:
raise NotImplementedError
size = self.size_heater(index=index)
idx = (size > self._constraint_heat_pumps) & (
~size.index.get_level_values('Heating system').isin(self._resources_data['index']['Heat pumps']))
condition.loc[idx, [i for i in self._resources_data['index']['Heat pumps'] if i in condition.columns]] = False
# technical restriction - heat pump can only be switch with heat pump
idx = condition.index.get_level_values('Heating system').isin(self._resources_data['index']['Heat pumps'])
condition.loc[idx, [i for i in condition.columns if i not in self._resources_data['index']['Heat pumps']]] = False
# technical restriction - direct electric cannot switch to fossil
fossil_boilers = ['Natural gas-Performance boiler', 'Oil fuel-Performance boiler']
idx = condition.index.get_level_values('Heating system') == 'Electricity-Direct electric'
condition.loc[idx, [i for i in condition.columns if i in fossil_boilers]] = False
# technical restriction - wood boiler can only be switch with wood boiler
wood_boilers = ['Wood fuel-Performance boiler', 'Wood fuel-Performance boiler']
idx = condition.index.get_level_values('Heating system').isin(wood_boilers)
condition.loc[idx, [i for i in condition.columns if i not in wood_boilers]] = False
if False:
# only dwelling fuel with natural gas can get natural gas
idx = condition.index.get_level_values('Heating system').isin([i for i in condition.columns if i != 'Natural gas-Performance boiler'])
condition.loc[idx, 'Natural gas-Performance boiler'] = False
# multi-family not heated by wood cannot
idx = condition.index.get_level_values('Heating system').isin([i for i in condition.columns if i != 'Wood fuel-Performance boiler']) & (condition.index.get_level_values('Housing type') == 'Multi-family')
condition.loc[idx, 'Wood fuel-Performance boiler'] = False
# bill saving
choice_heater = cost_heater.index
choice_heater_idx = Index(choice_heater, name='Heating system final')
temp = Series(0, index=index, dtype='float').to_frame().dot(Series(0, index=choice_heater_idx, dtype='float').to_frame().T)
index_final = temp.stack().index
consumption_std, _, certificate = self.consumption_heating_store(index_final, level_heater='Heating system final')
consumption_std = reindex_mi(consumption_std.unstack('Heating system final'), index)
consumption_std = (reindex_mi(self._surface, consumption_std.index) * consumption_std.T).T
bill_std = self.energy_bill(prices, consumption_std)
"""temp = [i.split('-')[0] for i in consumption.columns]
prices_re = prices.reindex(temp).set_axis(consumption.columns)
bill = consumption * prices_re"""
consumption_std_before = self.consumption_heating_store(index, level_heater='Heating system')[0]
consumption_std_before = reindex_mi(consumption_std_before, index) * reindex_mi(self._surface, index)
bill_std_before = self.energy_bill(prices, consumption_std_before)
bill_std_saved = - bill_std.sub(bill_std_before, axis=0)
bill_saved_std = bill_std_saved.copy()
certificate = reindex_mi(certificate.unstack('Heating system final'), index)
certificate_before = self.consumption_heating_store(index)[2]
certificate_before = reindex_mi(certificate_before, index)
consumption_std_saved = (consumption_std_before - consumption_std.T).T
consumption_std_before = self.add_attribute(consumption_std_before, 'Income tenant')
consumption_std_before = consumption_std_before.reorder_levels(self.stock.index.names)
index_consumption = consumption_std_before.index.intersection(self.stock.index)
consumption_std_before = consumption_std_before.loc[index_consumption]
heating_intensity_before = self.to_heating_intensity(consumption_std_before.index, prices,
consumption=consumption_std_before,
level_heater='Heating system',
bill_rebate=bill_rebate)
bill_std_saved = self.add_attribute(bill_std_saved, 'Income tenant')
bill_std_saved = bill_std_saved.reorder_levels(self.stock.index.names)
bill_std_saved = bill_std_saved.loc[index_consumption]
# x = pd.concat((consumption_std_before, reindex_mi(certificate_before, index=)))
# bill saved that include rebound effect so increase of thermal comfort
bill_saved = (bill_std_saved.T * heating_intensity_before).T
bill_saved = bill_saved.groupby(index.names).mean()
# emission saved
consumption_before = consumption_std_before * heating_intensity_before
emission_before = self.energy_bill(carbon_content, consumption_before, level_heater='Heating system')
consumption_std = self.add_attribute(consumption_std, 'Income tenant')
consumption_std = consumption_std.reorder_levels(self.stock.index.names)
consumption_std = consumption_std.loc[index_consumption]
consumption_std = consumption_std.stack('Heating system final')
heating_intensity = self.to_heating_intensity(consumption_std.index, prices,
consumption=consumption_std,
level_heater='Heating system final',
bill_rebate=bill_rebate)
consumption = (consumption_std.T * heating_intensity).T
emission = self.energy_bill(carbon_content, consumption, level_heater='Heating system final')
consumption = consumption.unstack('Heating system final')
emission = emission.unstack('Heating system final')
consumption_saved = (consumption_before - consumption.T).T
consumption_saved = consumption_saved.groupby(index.names).mean()
emission_saved = (emission_before - emission.T).T
emission_saved = emission_saved.groupby(index.names).mean()
# subsidies
cost_heater = size_heater.to_frame().dot(cost_heater.to_frame().T)
cost_heater, vat_heater, subsidies_details, subsidies_total, eligible = self.apply_subsidies_heater(index,
policies_heater,
cost_heater.copy(),
consumption_std_saved,
emission_saved,
bill_saved)
# cost financing
p = [p for p in policies_heater if p.policy == 'zero_interest_loan']
cost_total, cost_financing, amount_debt, amount_saving, discount, subsidies = self.calculate_financing(
cost_heater,
subsidies_total,
financing_cost, policies=p)
subsidies_loan = DataFrame(0, index=cost_total.index, columns=cost_total.columns)
for policy in p:
subsidies_details.update({policy.name: subsidies[policy.name]})
subsidies_loan += subsidies[policy.name]
if store_information:
if prices_before is None:
prices_before = prices
heating_intensity_before = self.to_heating_intensity(consumption_std_before.index, prices_before,
consumption=consumption_std_before,
level_heater='Heating system',
bill_rebate=bill_rebate)
consumption_before = consumption_std_before * heating_intensity_before
heating_intensity_after = self.to_heating_intensity(consumption_std.index, prices_before,
consumption=consumption_std,
level_heater='Heating system final',
bill_rebate=bill_rebate)
consumption_actual = (consumption_std * heating_intensity_after)
consumption_actual = consumption_actual.unstack('Heating system final')
consumption_no_rebound = (consumption_std.unstack('Heating system final').T * heating_intensity_before).T
epc_upgrade = - certificate.replace(EPC2INT).sub(
certificate_before.replace(EPC2INT), axis=0)
p = [p for p in policies_heater if p.policy == 'credit_constraint']
if credit_constraint and not p:
credit_constraint = self.credit_constraint(amount_debt, financing_cost, bill_saved=None)
else:
credit_constraint = None
# district heating are automatically excluded from endogenous market share
index_endogenous = index
if 'Heating-District heating' in flow_replace.index.get_level_values('Heating system'):
flow_replace_sum = flow_replace.sum()
replace_district_heating = select(flow_replace, {'Heating system': 'Heating-District heating'})
flow_replace = flow_replace.drop('Heating-District heating', level='Heating system')
assert round(flow_replace_sum, 0) == round(flow_replace.sum() + replace_district_heating.sum(), 0), 'Error in flow replace'
index_endogenous = index.drop(replace_district_heating.index)
if self._endogenous:
bill_saved_expectation = bill_saved.copy()
if self._belief_engineering_calculation is None:
bill_saved_expectation = bill_saved_std.copy()
market_share, _unobserved_value = self.endogenous_market_share_heater(index_endogenous, bill_saved_expectation,
subsidies_total, cost_heater,
calib_heater=calib_heater,
_cost_financing=cost_financing,
condition=condition,
flow_replace=flow_replace,
drop_heater=[
'Heating-District heating'],
_credit_constraint=credit_constraint)
hidden_benefits_heater = (_unobserved_value / abs(self.preferences_heater['cost'])) * 1000
# to reduce number of combination if market_shares for one technology is too small it is removed
market_share[market_share < 10 ** -2] = 0
market_share = (market_share.T / market_share.sum(axis=1)).T
else:
market_share = self.exogenous_market_share_heater(index, cost_heater.columns)
assert (market_share.sum(axis=1).round(0) == 1).all(), 'Market-share issue'
# first considering the case where the heating system is replaced by district heating
if district_heating is not None:
temp = flow_replace[~flow_replace.index.get_level_values('Heating system').isin(
['Electricity-Heat pump water', 'Electricity-Heat pump air'])]
temp = select(temp, {'Housing type': 'Multi-family'})
if district_heating > temp.sum():
district_heating = temp.sum()
to_district_heating = district_heating * temp / temp.sum()
to_district_heating = to_district_heating.reindex(flow_replace.index).fillna(0)
flow_replace = flow_replace - to_district_heating
replacement = (market_share.T * flow_replace).T
if district_heating is not None:
# adding heating system switching to district heating
to_district_heating = concat((to_district_heating, replace_district_heating), axis=0)
replacement = concat((replacement, to_district_heating.rename('Heating-District heating')), axis=1)
assert_almost_equal(replacement.sum().sum(), flow_replace_sum, decimal=0)
# indicator
temp = replacement.fillna(0).groupby(['Housing type', 'Heating system']).sum()
temp = (temp.T / temp.sum(axis=1)).T
replacement = replacement.groupby(replacement.columns, axis=1).sum()
replacement.columns.names = ['Heating system final']
stock_replacement = replacement.stack('Heating system final')
to_replace = replacement.sum(axis=1)
stock = stock - to_replace
# correct rounding error
stock[stock < 0] = 0
if self.year == self.first_year + 2:
distortion_grouped = self.calculate_distortion_heater(stock, cost_heater, emission_saved, carbon_value,
bill_saved)
levels = [k for k in distortion_grouped.index.names if k != 'Technology']
subsidies_grouped = self.weighted_average(subsidies_total, stock.copy(), levels=levels,
technology='heater')
s = self.add_certificate(stock)
s = self.add_energy(s)
stock_grouped = s.groupby([i for i in subsidies_grouped.index.names if i in s.index.names]).sum()
stock_grouped = stock_grouped[stock_grouped > 0]
temp = concat((subsidies_grouped, distortion_grouped), axis=1, keys=['Subsidies', 'Distortion'])
self._distortion_store.update({'heater': temp.copy()})
if self.path_calibration is not None:
from project.utils import make_scatter_plot
temp = self._distortion_store['heater'].copy()
temp = temp.reset_index()
col_colors = 'Technology'
temp[col_colors] = temp[col_colors].apply(lambda x: self._resources_data['colors'][x])
#col_colors = None
make_scatter_plot(temp, 'Subsidies', 'Distortion', 'Subsidies (Thousand euro)',
'Distortion (Thousand euro)',
annotate=False, save=os.path.join(self.path_calibration, 'subsidies_distortion_heater.png'),
format_y=lambda y, _: '{:.0f}'.format(y / 1e3),
format_x=lambda x, _: '{:.1f}'.format(x / 1e3),
s=10, diagonal_line=True, col_colors=col_colors)
# adding heating system final equal to heating system because no switch
flow_premature_replacement = 0
if premature_replacement is not None:
def premature_func(x, alpha=10 ** -3):
return 1 / (1 + exp(-alpha * x))
alpha = fsolve(lambda a: premature_func(-500, alpha=a) - 0.10, 10 ** -3)[0]
bill_saved[bill_saved <= 0] = float('nan')
npv = - cost_total + subsidies_total + 3 * bill_saved
npv = npv.loc[:, [i for i in npv.columns if i in self._resources_data['index']['Heat pumps']]]
npv = npv.dropna(axis=1, how='all')
# not implemented yet
if supply is not None:
if self.number_firms_heater is None:
self._markup_heater_store = supply['markup_heater']
cost = cost_heater['Electricity-Heat pump water']
weight = stock / stock.sum()
def mark_up(price, u=npv, n=2, cost=cost, weight=weight):
u = u.squeeze()
u += - (price - cost)
proba = premature_func(u, alpha=alpha)
proba = proba.reindex(weight.index).fillna(0)
elasticity = - alpha * price * (1 - proba)
elasticity_global = (elasticity * weight).sum()
return 1 / (1 + 1 / (elasticity_global * n))
if False and self.number_firms_heater is None:
mark_up_target = supply['markup_heater']
number_firms = fsolve(
lambda x: mark_up(cost * self._markup_heater_store, u=npv.copy(), n=x) - mark_up_target, 3)[0]
self.number_firms_heater = number_firms
self.logger.info('Number of firms heater market: {:.1f}'.format(number_firms))
self.number_firms_heater = 2
def cournot_equilibrium(price, n=self.number_firms_heater, cost=cost, u=npv.copy()):
return price / cost - mark_up(price, u=u, n=n)
price = fsolve(cournot_equilibrium, cost * self._markup_heater_store)[0]
if price != 0:
markup = price / cost
self._markup_heater_store = markup
self.logger.info('Equilibrium heater found. Markup: {:.2f}'.format(markup))
else:
self.logger.info('ERROR No Equilibrium. Keeping the same markup. Markup: {:.2f}'.format(
self._markup_heater_store))
proba_replace = premature_func(npv, alpha=alpha) * premature_replacement['information_rate']
proba_replace.dropna(inplace=True)
to_replace = (stock[flow_replace.index] * proba_replace.T).T
if to_replace.sum().sum() > 1:
# TODO: does not work if multiple heater
flow_premature_replacement = to_replace.sum()
flow_replace = to_replace.idxmin(axis=1).rename('Heating system final')
flow_replace = concat((flow_replace, to_replace), axis=1)
stock_replacement_premature = flow_replace.set_index('Heating system final', append=True).squeeze(
axis=1)
stock = stock - to_replace.squeeze().reindex(stock.index).fillna(0)
stock_replacement = concat((stock_replacement, stock_replacement_premature), axis=0)
stock_replacement = stock_replacement.groupby(stock_replacement.index.names).sum()
stock_replacement = stock_replacement[stock_replacement > 0]
stock = concat((stock, Series(stock.index.get_level_values('Heating system'), index=stock.index,
name='Heating system final')), axis=1).set_index('Heating system final', append=True).squeeze()
stock = concat((stock.reorder_levels(stock_replacement.index.names), stock_replacement),
axis=0, keys=[False, True], names=['Heater replacement'])
stock.sort_index(inplace=True)
assert round(stock.sum() - self.stock_mobile.xs(True, level='Existing', drop_level=False).sum(),
0) == 0, 'Sum problem'
if store_information:
self.store_information_heater(cost_heater.copy(), subsidies_total, bill_saved, subsidies_details,
stock_replacement.unstack('Heating system final'),
vat_heater, cost_financing, amount_debt, amount_saving, discount,
consumption_std_saved,
consumption_before, consumption_no_rebound, consumption_actual, epc_upgrade,
flow_premature_replacement, subsidies_loan, eligible, hidden_benefits_heater)
return stock
[docs] def prepare_cost_insulation(self, cost_insulation):
"""Constitute insulation choice set cost. Cost is equal to the sum of each individual cost component.
Parameters
----------
cost_insulation: Series
Returns
-------
Series
Multiindex series. Levels are Wall, Floor, Roof and Windows and values are boolean.
"""
cost = DataFrame(0, index=cost_insulation.index, columns=self._choice_insulation)
idx = IndexSlice
cost.loc[:, idx[True, :, :, :]] = (cost.loc[:, idx[True, :, :, :]].T + cost_insulation['Wall']).T
cost.loc[:, idx[:, True, :, :]] = (cost.loc[:, idx[:, True, :, :]].T + cost_insulation['Floor']).T
cost.loc[:, idx[:, :, True, :]] = (cost.loc[:, idx[:, :, True, :]].T + cost_insulation['Roof']).T
cost.loc[:, idx[:, :, :, True]] = (cost.loc[:, idx[:, :, :, True]].T + cost_insulation['Windows']).T
return cost
[docs] def prepare_subsidy_insulation(self, subsidies_insulation, policy=None):
"""Constitute insulation choice set subsidies. Subsidies are equal to the sum of each individual subsidy.
Parameters
----------
policy
subsidies_insulation: DataFrame
Returns
-------
DataFrame
Multiindex columns. Levels are Wall, Floor, Roof and Windows and values are boolean.
"""
idx = IndexSlice
subsidies = {}
for i in self.surface_insulation.index:
index = subsidies_insulation.index
if 'Housing type' in subsidies_insulation.index.names:
index = subsidies_insulation[subsidies_insulation.index.get_level_values('Housing type') == i].index
subsidy = DataFrame(0, index=index, columns=self._choice_insulation)
subsidy.loc[index, idx[True, :, :, :]] = subsidy.loc[index, idx[True, :, :, :]].add(
subsidies_insulation['Wall'] * self.surface_insulation.loc[i, 'Wall'], axis=0)
subsidy.loc[index, idx[:, True, :, :]] = subsidy.loc[index, idx[:, True, :, :]].add(
subsidies_insulation['Floor'] * self.surface_insulation.loc[i, 'Floor'], axis=0)
subsidy.loc[index, idx[:, :, True, :]] = subsidy.loc[index, idx[:, :, True, :]].add(
subsidies_insulation['Roof'] * self.surface_insulation.loc[i, 'Roof'], axis=0)
subsidy.loc[index, idx[:, :, :, True]] = subsidy.loc[index, idx[:, :, :, True]].add(
subsidies_insulation['Windows'] * self.surface_insulation.loc[i, 'Windows'], axis=0)
subsidies[i] = subsidy.copy()
if 'Housing type' in subsidies_insulation.index.names:
subsidies = concat(list(subsidies.values()), axis=0)
else:
subsidies = concat(list(subsidies.values()), axis=0, keys=self.surface_insulation.index,
names=self.surface_insulation.index.names)
if policy == 'subsidy_ad_valorem':
# NotImplemented: ad_valorem with different subsididies rate
value = [v for v in subsidies_insulation.stack().unique() if v != 0][0]
subsidies[subsidies > 0] = value
return subsidies
[docs] def apply_subsidies_insulation(self, index, policies_insulation, cost_insulation, surface, certificate,
certificate_before, certificate_before_heater, energy_saved_3uses,
consumption_saved, carbon_content, calculate_condition=True, health_cost_saved=None,
bill_saved=None):
"""Calculate subsidies amount for each possible insulation choice.
Parameters
----------
index
policies_insulation: list
cost_insulation: DataFrame
Cost for each segment and each possible insulation choice (€).
surface: Series
Surface / dwelling for each segment (m2/dwelling).
certificate : DataFrame
Certificate by segment after insulation replacement for each possible insulation choice.
certificate_before : Series
Certificate by segment before insulation (but after heater replacement)
certificate_before_heater: Series
Certificate by segment before insulation, and before heater replacement. Useful to calculate the total
number of upgrade this year.
energy_saved_3uses: DataFrame
Primary conventional energy consumption saved by the insulation work (% - kWh PE/m2).
consumption_saved: kWh
Returns
-------
cost_insulation: DataFrame
vat_insulation: DataFrame
tax : float
subsidies_details: dict
subsidies_total: DataFrame
condition: dict
eligible: dict
"""
def defined_condition(_index, _certificate, _certificate_before, _certificate_before_heater,
_energy_saved_3uses, _cost_insulation, _consumption_saved, _list_conditions):
"""Define condition to get subsidies or loan.
Depends on income (index) and energy performance of renovationd defined by certificate jump or
energy_saved_3uses.
Parameters
----------
_index: MultiIndex
_certificate: DataFrame
_certificate_before: Series
_certificate_before_heater: Series
_energy_saved_3uses: DataFrame
_cost_insulation: DataFrame
_consumption_saved: DataFrame
_list_conditions: list
Returns
-------
condition: dict
Contains boolean DataFrame that established condition to get subsidies.
"""
_condition = dict()
if 'out_worst' in _list_conditions:
out_worst = (~_certificate.isin(['G', 'F'])).T.multiply(_certificate_before.isin(['G', 'F'])).T
out_worst = reindex_mi(out_worst, _index).fillna(False).astype('float')
_condition.update({'out_worst': out_worst})
if 'reach_best' in _list_conditions:
in_best = (_certificate.isin(['A', 'B'])).T.multiply(~_certificate_before.isin(['A', 'B'])).T
in_best = reindex_mi(in_best, _index).fillna(False).astype('float')
_condition.update({'reach_best': in_best})
if 'mitigation_option' in _list_conditions:
# selecting best cost efficiency opportunities to reach A or B
_c_saved = _consumption_saved.copy().replace(0, float('nan'))
levels = ['Heating system final' if i == 'Heating system' else i for i in self._resources_data['index']['Dwelling']]
_c_saved = _c_saved.groupby(levels).first()
_c_insulation = reindex_mi(_cost_insulation, _c_saved.index)
cost_saving = _c_insulation / _c_saved
# cost_saving = reindex_mi(cost_saving, _index)
_temp = reindex_mi(_certificate, cost_saving.index)
# TODO: select_deep_renovation
best = _temp.isin(['A', 'B']).replace(False, float('nan')) * cost_saving
# if B is not possible then C or D
idx = best[best.isna().all(axis=1)].index
s_best = _temp.loc[idx, :].isin(['C']).replace(False, float('nan')) * cost_saving.loc[idx, :]
best = best.dropna(how='all')
best = concat((best, s_best)).astype(float)
if s_best.isna().all(axis=1).any():
print('D best peformance')
idx = s_best[s_best.isna().all(axis=1)].index
best = best.dropna(how='all')
t_best = _temp.loc[idx, :].isin(['D']).replace(False, float('nan')) * cost_saving.loc[idx, :]
best = concat((best, t_best)).astype(float)
_condition.update({'mitigation_option': (best.T == best.min(axis=1)).T})
if [i for i in ['best_efficiency', 'best_efficiency_fg', 'efficiency_100', 'fg'] if i in _list_conditions]:
_cost_annualized = calculate_annuities(_cost_insulation)
cost_saving = reindex_mi(_cost_annualized, _consumption_saved.index) / _consumption_saved
best_efficiency = (cost_saving.T == cost_saving.min(axis=1)).T
if 'best_efficiency' in _list_conditions:
_condition.update({'best_efficiency': best_efficiency.copy()})
if 'best_efficiency_fg' in _list_conditions:
best_efficiency_fg = (best_efficiency.T & _certificate_before.isin(['G', 'F'])).T
_condition.update({'best_efficiency_fg': best_efficiency_fg})
if 'efficiency_100' in _list_conditions:
_condition.update({'efficiency_100': cost_saving < 0.1})
if 'fg' in _list_conditions:
fg = cost_saving.copy()
fg[fg > 0] = False
fg.loc[reindex_mi(_certificate_before, fg.index).isin(['G', 'F']), :] = True
_condition.update({'fg': fg})
deep_condition = 2
energy_condition_35, energy_condition_50 = 0.35, 0.5
_epc_upgrade = - _certificate.replace(EPC2INT).sub(_certificate_before.replace(EPC2INT), axis=0)
_epc_upgrade = reindex_mi(_epc_upgrade, _index)
_certificate_before_heater = reindex_mi(_certificate_before_heater, _index)
_certificate = reindex_mi(_certificate, _index)
_epc_upgrade_all = - _certificate.replace(EPC2INT).sub(
_certificate_before_heater.replace(EPC2INT),
axis=0)
_condition.update({'epc_upgrade_all': _epc_upgrade_all})
# if 'deep_renovation' in _list_conditions:
_condition.update({'deep_renovation': _epc_upgrade_all >= deep_condition})
if 'epc_upgrade_min' in _list_conditions:
_condition.update({'epc_upgrade_min': _epc_upgrade_all >= 1})
if 'deep_renovation_fg' in _list_conditions:
condition_deep_renovation = _epc_upgrade_all >= deep_condition
_condition.update(
{'deep_renovation_fg': (condition_deep_renovation.T & _certificate_before.isin(['G', 'F'])).T})
if 'deep_renovation_fge' in _list_conditions:
condition_deep_renovation = _epc_upgrade_all >= deep_condition
_condition.update({'deep_renovation_fge': (
condition_deep_renovation.T & _certificate_before.isin(['G', 'F', 'E'])).T})
if 'fossil' in _list_conditions:
fossil = ['Oil fuel-Performance boiler', 'Oil fuel-Standard boiler', 'Oil fuel-Collective boiler',
'Natural gas-Performance boiler', 'Natural gas-Standard boiler', 'Natural gas-Collective boiler'
]
_temp = pd.Series(_certificate.index.get_level_values('Heating system').isin(fossil), index=_certificate.index)
_temp = reindex_mi(_temp, _index).fillna(False).astype('float')
_condition.update({'fossil': _temp})
if 'deep_renovation_low_income' in _list_conditions:
low_income_condition = ['D1', 'D2', 'D3', 'D4', 'C1', 'C2']
low_income_condition = _index.get_level_values('Income owner').isin(low_income_condition)
low_income_condition = Series(low_income_condition, index=_index)
condition_deep_renovation = _epc_upgrade_all >= deep_condition
_condition.update(
{'deep_renovation_low_income': (low_income_condition & condition_deep_renovation.T).T})
if 'deep_renovation_high_income' in _list_conditions:
condition_deep_renovation = _epc_upgrade_all >= deep_condition
high_income_condition = ['D5', 'D6', 'D7', 'D8', 'D9', 'D10', 'C3', 'C4', 'C5']
high_income_condition = _index.get_level_values('Income owner').isin(high_income_condition)
high_income_condition = Series(high_income_condition, index=_index)
_condition.update(
{'deep_renovation_high_income': (high_income_condition & condition_deep_renovation.T).T})
if 'energy_condition_35' in _list_conditions:
energy_condition_35 = _energy_saved_3uses >= energy_condition_35
_condition.update({'energy_condition_35': reindex_mi(energy_condition_35, _index)})
if 'energy_condition_50' in _list_conditions:
energy_condition_50 = _energy_saved_3uses >= energy_condition_50
_condition.update({'energy_condition_50': reindex_mi(energy_condition_50, _index)})
if 'mpr_no_fg' in _list_conditions:
_condition.update({'mpr_no_fg': ~_certificate_before.isin(['G', 'F'])})
if 'heater_replacement' in _list_conditions:
_temp = pd.Series(_certificate.index.get_level_values('Heater replacement'), index=_certificate.index)
_condition.update({'heater_replacement': _temp})
if 'heater_replacement_low_carbon' in _list_conditions:
_temp = pd.Series(_certificate.index.get_level_values('Heater replacement'), index=_certificate.index)
_temp = _temp & pd.Series(_certificate.index.get_level_values('Heating system final').isin(
self._resources_data['index']['Low Carbon']), index=_certificate.index)
_temp = (_condition['deep_renovation'].T & _temp).T
_condition.update({'heater_replacement_low_carbon': _temp})
if 'heater_replacement_heat_pump' in _list_conditions:
_temp = pd.Series(_certificate.index.get_level_values('Heater replacement'), index=_certificate.index)
_temp = _temp & pd.Series(_certificate.index.get_level_values('Heating system final').isin(
self._resources_data['index']['Heat pumps']), index=_certificate.index)
_temp = (_condition['deep_renovation'].T & _temp).T
_condition.update({'heater_replacement_heat_pump': _temp})
if 'mpr_no_fg_heater_replacement' in _list_conditions:
_temp = pd.Series(_certificate.index.get_level_values('Heater replacement'), index=_certificate.index)
_temp = _temp & reindex_mi(~_certificate_before.isin(['G', 'F']), _temp.index)
"""_temp = _certificate.loc[_certificate.index.get_level_values('Heater replacement'), :]
_temp = ~_temp.isna()
temp = (_temp.T * reindex_mi(~_certificate_before.isin(['G', 'F']), _temp.index)).T"""
_condition.update({'mpr_no_fg_heater_replacement': _temp})
if 'mpr_serenite_nb' in _list_conditions:
nb_measures = Series([sum(i) for i in _certificate.columns], index=_certificate.columns)
_temp = Series(0, index=_certificate.index)
_temp[_temp.index.get_level_values('Heater replacement')] += 1
_temp = concat([_temp] * nb_measures.shape[0], axis=1).set_axis(nb_measures.index, axis=1)
nb_measures = _temp + nb_measures
_condition.update({'mpr_serenite_nb': nb_measures >= 3})
return _condition
subsidies_details = {}
vat = VAT
p = [p for p in policies_insulation if 'reduced_vat' == p.policy]
if p:
vat = p[0].value
if isinstance(vat, dict):
vat = vat[self.year]
subsidies_details.update({p[0].name: reindex_mi(cost_insulation * (VAT - vat), index)})
vat_insulation = cost_insulation * vat
cost_insulation += vat_insulation
if calculate_condition:
self._list_condition_subsidies = [i.target for i in policies_insulation if
isinstance(i.target, str) and i.policy != 'subsidies_cap']
list_conditions = self._list_condition_subsidies.copy()
if not self._endogenous:
list_conditions += ['mitigation_option']
condition = defined_condition(index, certificate, certificate_before,
certificate_before_heater,
energy_saved_3uses, cost_insulation,
consumption_saved, list_conditions
)
else:
# TODO: do not work because _condition_store do not have _list_condition_subsidies
condition = self._condition_store
policies_incentive = ['subsidy_ad_valorem', 'subsidy_target', 'subsidy_proportional', 'subsidies_cap',
'subsidy_present_bias', 'subsidy_landlord', 'subsidy_multi_family']
self.policies += [i.name for i in policies_insulation if i.policy in policies_incentive and i.name not in self.policies]
sub_non_cumulative = {}
for policy in [p for p in policies_insulation if p.policy in ['subsidy_ad_valorem', 'subsidy_target', 'subsidy_proportional',
'subsidy_present_bias', 'subsidy_landlord', 'subsidy_multi_family']]:
if isinstance(policy.value, dict):
value = policy.value[self.year]
else:
value = policy.value
if policy.policy == 'subsidy_target':
if isinstance(value, Series):
idx = pd.Index(['Single-family', 'Multi-family'], name='Housing type')
value = concat([value] * len(idx), keys=idx, axis=1).T
value = (reindex_mi(self.prepare_subsidy_insulation(value), index).T * surface).T
if policy.target is not None:
if isinstance(condition[policy.target], Series):
value = (value.T * reindex_mi(condition[policy.target], value.index)).T
else:
raise NotImplemented('Need to implement target for subsidy_target')
elif policy.policy == 'subsidy_ad_valorem':
cost = policy.cost_targeted(reindex_mi(cost_insulation, index),
target_subsidies=condition.get(policy.target))
if isinstance(value, (Series, float, int)):
temp = reindex_mi(value, cost.index)
value = (temp * cost.T).T
else:
temp = self.prepare_subsidy_insulation(value, policy=policy.policy)
value = reindex_mi(temp, cost.index) * cost
value = value.fillna(0)
elif policy.policy == 'subsidy_proportional':
consumption_saved_cumac = self.to_cumac(consumption_saved, discount=self.social_discount_rate,
duration=self.lifetime_insulation)
consumption_saved_cumac[consumption_saved_cumac < 0] = 0
# reindex_mi(cost_insulation, index) / consumption_saved_cumac
if policy.proportional == 'tCO2_cumac':
temp = self.add_energy(consumption_saved_cumac)
emission_saved_cumac = (temp.T * reindex_mi(carbon_content.rename_axis(['Energy']),
temp.index)).T / 10 ** 3
value = value * emission_saved_cumac
value = value.droplevel('Energy', axis=0)
elif policy.proportional == 'MWh_cumac':
value = value * consumption_saved_cumac
elif policy.proportional == 'health_cost':
value = self.to_cumac(health_cost_saved, discount=self.social_discount_rate,
duration=self.lifetime_insulation) * 1e3
value = reindex_mi(value, index).fillna(0)
else:
raise NotImplemented
elif policy.policy == 'subsidy_present_bias':
value = bill_saved.copy()
value[value < 0] = 0
value = (reindex_mi((self.preferences_insulation['discount_factor_perfect'] -
self.preferences_insulation['discount_factor']), value.index) * value.T).T
elif policy.policy == 'subsidy_landlord':
value = self.landlord_dilemma.copy()
value = reindex_mi(value, index).fillna(0)
value = concat([value] * cost_insulation.shape[1], keys=cost_insulation.columns, axis=1)
elif policy.policy == 'subsidy_multi_family':
value = self.multifamily_friction.copy()
value = reindex_mi(value, index).fillna(0)
value = concat([value] * cost_insulation.shape[1], keys=cost_insulation.columns, axis=1)
# cap for all policies
value.fillna(0, inplace=True)
cost = reindex_mi(cost_insulation, index)
# cost = cost.reindex(value.columns, axis=1)
value = value.where(value <= cost, cost)
if policy.cap is not None:
if isinstance(policy.cap, dict):
cap = policy.cap[self.year]
else:
cap = policy.cap
cap = reindex_mi(cap, value.index).fillna(float('inf'))
cap = concat([cap] * value.shape[1], keys=value.columns, axis=1)
value = value.where(value < cap, cap)
if not policy.social_housing:
value.loc[value.index.get_level_values('Occupancy status') == 'Social-housing', :] = 0
if policy.non_cumulative is None:
if policy.name in subsidies_details.keys():
subsidies_details[policy.name] = subsidies_details[policy.name] + value
else:
subsidies_details[policy.name] = value.copy()
else:
sub_non_cumulative.update({policy: value.copy()})
# adding bonus subsidies (subsidies have been calculated before)
subsidies_bonus = [p for p in policies_insulation if p.policy == 'bonus']
for policy in subsidies_bonus:
if isinstance(policy.value, dict):
bonus_value = policy.value[self.year]
else:
bonus_value = policy.value
if policy.target is not None:
value = (reindex_mi(bonus_value, condition[policy.target].index) * condition[policy.target].T).T
else:
if policy.name not in subsidies_details.keys():
raise KeyError('Bonus subsidies should be coupled in other subsidy')
value = reindex_mi(bonus_value, subsidies_details[policy.name].index)
value.fillna(0, inplace=True)
if not policy.social_housing:
print('ok')
value.loc[value.index.get_level_values('Occupancy status') == 'Social-housing', :] = 0
# if policy defined by insulation
if len(value.columns.names) == 1:
value = (self.prepare_subsidy_insulation(value).T * surface).T
if policy.name in subsidies_details.keys():
if isinstance(value, (DataFrame, float, int)):
subsidies_details[policy.name] = subsidies_details[policy.name] + value
elif isinstance(value, Series):
subsidies_details[policy.name] = (subsidies_details[policy.name].T + value).T
else:
subsidies_details[policy.name] = value.copy()
# store eligible before cap and cumulative
eligible = {}
for policy in subsidies_details:
eligible.update({policy: (subsidies_details[policy] > 0).any(axis=1).copy()})
for policy in sub_non_cumulative:
eligible.update({policy.name: (sub_non_cumulative[policy] > 0).any(axis=1).copy()})
# for non-cumulative subsidies, we compare the value of the subsidies with the sum of non-cumulative subsidies
for policy, value in sub_non_cumulative.items():
compare = sum([subsidies_details[p] for p in policy.non_cumulative if p in subsidies_details.keys()])
if isinstance(compare, DataFrame):
value = reindex_mi(value, compare.index)
subsidies_details[policy.name] = value.where(value > compare, 0)
for p in policy.non_cumulative:
if p in subsidies_details.keys():
subsidies_details[p] = subsidies_details[p].where(compare > value, 0)
else:
subsidies_details[policy.name] = value
"""for policy_compare in policy.non_cumulative:
if policy_compare in subsidies_details.keys():
comp = reindex_mi(subsidies_details[policy_compare], value.index)
subsidies_details[policy_compare] = comp.where(comp > value, 0)
subsidies_details[policy.name] = value.where(value > comp, 0)"""
subsidies_total = [subsidies_details[k] for k in subsidies_details.keys() if k not in ['reduced_vat', 'over_cap']]
if subsidies_total:
subsidies_total = sum(subsidies_total)
else:
subsidies_total = pd.DataFrame(0, index=consumption_saved.index, columns=consumption_saved.columns)
for k in subsidies_details.keys():
subsidies_details[k].sort_index(inplace=True)
# overall cap for cumulated amount of subsidies
subsidies_cap = [p for p in policies_insulation if p.policy == 'subsidies_cap']
if subsidies_cap:
# only one subsidy cap
subsidies_cap = subsidies_cap[0]
policies_target = subsidies_cap.target
subsidies_cap = reindex_mi(subsidies_cap.value, subsidies_total.index)
cap = (reindex_mi(cost_insulation, index).T * subsidies_cap).T
over_cap = subsidies_total > cap
subsidies_details['over_cap'] = (subsidies_total - cap)[over_cap].fillna(0)
subsidies_details['over_cap'].sort_index(inplace=True)
remaining = subsidies_details['over_cap'].copy()
for policy_name in policies_target:
if policy_name in subsidies_details.keys():
self.logger.debug('Capping amount for {}'.format(policy_name))
temp = remaining.where(remaining <= subsidies_details[policy_name], subsidies_details[policy_name])
subsidies_details[policy_name] -= temp
assert (subsidies_details[policy_name].values >= 0).all(), '{} got negative values'.format(
policy_name)
remaining -= temp
if allclose(remaining, 0, rtol=10 ** -1):
break
# it is possible to extend policies_target to other policies
assert allclose(remaining, 0, rtol=10 ** -1), 'Over cap'
subsidies_total -= subsidies_details['over_cap']
else:
cap = reindex_mi(cost_insulation, index)
over_cap = subsidies_total > cap
subsidies_details['over_cap'] = (subsidies_total - cap)[over_cap].fillna(0)
subsidies_details['over_cap'].sort_index(inplace=True)
remaining = subsidies_details['over_cap'].copy()
for policy_name in [k for k in subsidies_details.keys() if k != 'over_cap']:
self.logger.debug('Capping amount for {}'.format(policy_name))
temp = remaining.where(remaining <= subsidies_details[policy_name], subsidies_details[policy_name])
subsidies_details[policy_name] -= temp
assert (subsidies_details[policy_name].values >= 0).all(), '{} got negative values'.format(
policy_name)
remaining -= temp
if allclose(remaining, 0, rtol=10 ** -1):
break
if self.year == 2024:
print('ok')
# it is possible to extend policies_target to other policies
assert allclose(remaining, 0, rtol=10 ** -1), 'Over cap'
subsidies_total -= subsidies_details['over_cap']
return cost_insulation, vat_insulation, vat, subsidies_details, subsidies_total, condition, eligible
[docs] def calculate_distortion_insulation(self, index, stock, cost_insulation,
consumption_saved, health_cost_saved,
bill_saved, carbon_value, cap=True):
"""Calculate distortion amount for each possible insulation choice.
"""
distortion_details = {}
# 1. distortion landlord
value = self.landlord_dilemma.copy()
value = reindex_mi(value, index).fillna(0)
value = concat([value] * cost_insulation.shape[1], keys=cost_insulation.columns, axis=1)
value.fillna(0, inplace=True)
distortion_details.update({'landlord_dilemma': value.copy()})
# 2. distortion multi-family
value = self.multifamily_friction.copy()
value = reindex_mi(value, index).fillna(0)
value = concat([value] * cost_insulation.shape[1], keys=cost_insulation.columns, axis=1)
value.fillna(0, inplace=True)
distortion_details.update({'multi_family': value.copy()})
# 3. distortion present-bias
if False:
value = bill_saved.copy()
value[value < 0] = 0
value = (reindex_mi((self.preferences_insulation['discount_factor_perfect'] -
self.preferences_insulation['discount_factor']), value.index) * value.T).T
value.fillna(0, inplace=True)
distortion_details.update({'bill_saved': value.copy()})
# 4. distortion emission
consumption_saved_cumac = self.to_cumac(consumption_saved, discount=self.social_discount_rate,
duration=self.lifetime_insulation) * 1e3
consumption_saved_cumac[consumption_saved_cumac < 0] = 0
temp = self.add_energy(consumption_saved_cumac)
value = (temp.T * reindex_mi(carbon_value.rename_axis(['Energy']), temp.index)).T
value.fillna(0, inplace=True)
value = value.droplevel('Energy')
distortion_details.update({'emission_saved': value.copy()})
# 5. distortion health cost
value = self.to_cumac(health_cost_saved, discount=self.social_discount_rate,
duration=self.lifetime_insulation) * 1e3
value = reindex_mi(value, index).fillna(0)
distortion_details.update({'health_cost': value.copy()})
distortion_total = sum([i for k, i in distortion_details.items()])
if cap is True:
cap = reindex_mi(cost_insulation, index)
distortion_total = distortion_total.where(distortion_total <= cap, cap)
levels = ['Occupancy status', 'Housing type', 'Performance', 'Energy', 'Income owner']
distortion_grouped = self.weighted_average(distortion_total, stock.copy(), levels=levels, technology='insulation')
return distortion_grouped
[docs] def endogenous_renovation(self, stock, bill_saved, subsidies_total, cost_insulation, frequency_insulation,
policies_insulation, calib_renovation=None, min_performance=None, subsidies_details=None,
cost_financing=None, supply=None, credit_constraint=None, health_cost_saved=None):
"""Calculate endogenous retrofit based on discrete choice model.
Utility variables are investment cost, energy bill saving, and subsidies.
Preferences are object attributes defined initially.
# bill saved calculated based on the new heating system
# certificate before work and so subsidies before the new heating system
Parameters
----------
stock: Series
bill_saved: DataFrame
subsidies_total: DataFrame
cost_insulation: DataFrame
frequency_insulation: float or int
stock: Series, default None
calib_renovation: dict, optional
min_performance: str, optional
subsidies_details: dict, optional
cost_financing: DataFrame, optional
supply: DataFrame, optional
credit_constraint: float, optional
Returns
-------
Series
Retrofit rate
DataFrame
Market-share insulation
DataFrame
Utility insulation
Series
Consumer surplus
"""
def market_share_func(u):
return (exp(u).T / exp(u).sum(axis=1)).T
def to_market_share(_bill_saved, _subsidies_total, _cost_total, _cost_financing=None, _credit_constraint=None,
full_output=False, _min_performance=None, _health_cost_saved=None):
"""Calculate market-share between insulation options.
Parameters
----------
_bill_saved: DataFrame
_subsidies_total: DataFrame
_cost_total: DataFrame
_cost_financing: DataFrame, optional
_credit_constraint: DataFrame, optional
full_output: bool, optional
Returns
-------
ms_intensive: DataFrame
utility_intensive: DataFrame
Include alternative specific constant between renovation options.
But not between status quo and renovation.
"""
pref_sub = reindex_mi(self.preferences_insulation['subsidy'], _subsidies_total.index).rename(None)
utility_subsidies = (_subsidies_total.T * pref_sub).T / 1000
pref_investment = reindex_mi(self.preferences_insulation['cost'], _cost_total.index).rename(None)
utility_investment = (_cost_total.T * pref_investment).T / 1000
utility_bill_saving = (_bill_saved.T * reindex_mi(self.preferences_insulation['bill_saved'],
_bill_saved.index)).T / 1000
utility_intensive = utility_bill_saving + utility_investment + utility_subsidies
if False:
temp = (_bill_saved > 0).astype(float)
temp = temp.replace(0, float('nan'))
utility_intensive *= temp
utility_intensive.dropna(how='all', inplace=True)
if _cost_financing is not None:
pref_financing = reindex_mi(self.preferences_insulation['cost'], _cost_financing.index).rename(None)
utility_financing = (_cost_financing.T * pref_financing).T / 1000
utility_intensive += utility_financing
if self.constant_insulation_intensive is not None:
utility_intensive += self.constant_insulation_intensive
if _credit_constraint is not None:
_credit_constraint = _credit_constraint.astype(float)
_credit_constraint = _credit_constraint.replace(0, float('nan'))
utility_intensive *= _credit_constraint
utility_intensive.dropna(how='all', inplace=True)
if _health_cost_saved is not None:
_health_cost_saved = reindex_mi(_health_cost_saved, utility_intensive.index).fillna(0)
utility_health = (_health_cost_saved.T * reindex_mi(self.preferences_insulation['bill_saved'],
_health_cost_saved.index)).T / 1000
utility_intensive += utility_health
if _min_performance is not None:
certificate = reindex_mi(self._consumption_store['certificate_renovation'], utility_intensive.index)
_performance_constraint = certificate <= _min_performance
_performance_constraint = _performance_constraint.astype(float)
_performance_constraint = _performance_constraint.replace(0, float('nan'))
utility_intensive *= _performance_constraint
ms_intensive = market_share_func(utility_intensive)
if self.constant_insulation_extensive is not None and False:
# test if nested logit model is the same than conditional logit model
expected_utility = log(exp(utility_intensive).sum(axis=1))
probability_renovation = to_renovation_rate(expected_utility)
ms_nested = (ms_intensive.T * probability_renovation).T
ms_nested = concat((1 - probability_renovation, ms_nested), axis=1)
u_logit = (utility_intensive.T + reindex_mi(self.constant_insulation_extensive, utility_intensive.index)).T
u_logit = concat((Series(0, index=u_logit.index), u_logit), axis=1)
ms_logit = market_share_func(u_logit)
allclose(ms_nested, ms_logit)
if full_output:
return ms_intensive, utility_intensive, utility_bill_saving
else:
return ms_intensive, utility_intensive
def renovation_func(u, proba=1):
return proba * 1 / (1 + exp(- u))
def to_renovation_rate(_expected_utility, _proba_replacement=1):
"""Calculate retrofit rate based on binomial logit model.
Parameters
----------
Returns
-------
Series
Renovation rate for each household.
"""
utility_renovate = _expected_utility.copy()
if self.constant_insulation_extensive is not None:
utility_constant = reindex_mi(self.constant_insulation_extensive, _expected_utility.index)
utility_renovate = _expected_utility + utility_constant
probability_renovation = renovation_func(utility_renovate, proba=_proba_replacement)
return probability_renovation
def nested_logit_formula(_bill_saved, _subsidies_total, _cost_total, _cost_financing=None):
_bill_saved[_bill_saved == 0] = float('nan')
_subsidies_total[_bill_saved == 0] = float('nan')
_cost_total[_bill_saved == 0] = float('nan')
pref_sub = reindex_mi(self.preferences_insulation['subsidy'], _subsidies_total.index).rename(None)
utility_subsidies = (_subsidies_total.T * pref_sub).T / 1000
pref_investment = reindex_mi(self.preferences_insulation['cost'], _cost_total.index).rename(None)
utility_investment = (_cost_total.T * pref_investment).T / 1000
utility_bill_saving = (_bill_saved.T * reindex_mi(self.preferences_insulation['bill_saved'],
_bill_saved.index)).T / 1000
utility_intensive = utility_bill_saving + utility_investment + utility_subsidies
if _cost_financing is not None:
pref_financing = reindex_mi(self.preferences_insulation['cost'], _cost_financing.index).rename(None)
utility_financing = (_cost_financing.T * pref_financing).T / 1000
utility_intensive += utility_financing
if self.constant_insulation_intensive is not None:
utility_intensive += self.constant_insulation_intensive
_market_share = market_share_func(utility_intensive)
expected_utility = log(exp(utility_intensive).sum(axis=1))
if self.constant_insulation_extensive is not None:
utility_constant = reindex_mi(self.constant_insulation_extensive, expected_utility.index)
expected_utility += utility_constant
_renovation_rate = renovation_func(expected_utility)
probability = (_renovation_rate * _market_share.T).T
return probability
def apply_endogenous_renovation(_bill_saved, _subsidies_total, _cost_total, _cost_financing=None, _stock=None,
_supply=None, _cost_curve=True, _credit_constraint=None, _frequency_insulation=1,
_min_performance=None, _health_cost_saved=None):
if self.number_firms_insulation is None and _supply is not None:
self._markup_insulation_store = _supply['markup_insulation']
_market_share, _utility_intensive = to_market_share(_bill_saved, _subsidies_total,
_cost_total * self._markup_insulation_store,
_cost_financing=_cost_financing,
_credit_constraint=_credit_constraint,
_min_performance=_min_performance,
_health_cost_saved=health_cost_saved)
# calculate conditional expectation of error term (unobserved cost or benefits)
# formula is log(exp(total_utility).sum()) - utility_observed
total_utility = (_utility_intensive.T + reindex_mi(self.constant_insulation_extensive, _utility_intensive.index)).T
if _min_performance is None:
total_utility = add_no_renovation(total_utility)
constant_unobserved = self.constant_insulation.copy()
if _min_performance is None:
constant_unobserved = add_no_renovation(constant_unobserved)
constant_unobserved = reindex_mi(constant_unobserved, total_utility.index)
# constant_unobserved = constant_unobserved.reindex(total_utility.columns, axis=1)
error = (log(exp(total_utility).sum(axis=1)) - total_utility.T).T
_unobserved_value = error + constant_unobserved
expected_utility = log(exp(_utility_intensive).sum(axis=1))
_renovation_rate = to_renovation_rate(expected_utility, _frequency_insulation)
if _supply:
_investment_insulation = (_cost_total.reindex(_market_share.index) * _market_share).sum(axis=1)
pref_investment = reindex_mi(self.preferences_insulation['cost'], _investment_insulation.index).rename(
None)
weight = stock / stock.sum()
cost_renovation = (_renovation_rate * _investment_insulation).sum() / _renovation_rate.sum()
def mark_up(price, u=expected_utility, n=2):
proba = 1 / (1 + exp(-u))
elasticity = pref_investment * price / 1000 * (1 - proba)
elasticity_global = (elasticity * weight).sum()
return 1 / (1 + 1 / (elasticity_global * n))
def cournot_equilibrium(price, n=1, cost_renovation=cost_renovation, u=expected_utility):
return price / cost_renovation - mark_up(price, u=u, n=n)
"""temp_elasticity, temp_margin, temp_markup, temp_feedback = dict(), dict(), dict(), dict()
for i in range(-5, 5, 1):
u = utility + abs(utility) * i / 10
renovation = (1 / (1 + exp(-u)) * _stock).sum() / 10**3
temp_elasticity.update({renovation: price_elasticity_demand(cost_renovation, u=u)})
mu = mark_up(cost_renovation, u=u, n=2)
temp_markup.update({renovation: mu})
feedback = (1 / (1 + exp(-(u+(mu-1)*utility_investment))) * _stock).sum() / 10**3
temp_feedback.update({mu: feedback})
# temp_margin.update({renovation: fsolve(cournot_equilibrium, cost_renovation, args=(1, cost_renovation, u))[0]})
temp_markup = pd.Series(temp_markup)
temp_feedback = pd.Series(temp_feedback)
temp_elasticity = pd.Series(temp_elasticity)
temp_margin = pd.Series(temp_margin)
make_plot(pd.Series(temp_elasticity), 'Elasticity (Percent)', format_y=lambda y, _: '{:,.2f}'.format(y),
save=os.path.join(self.path_calibration, 'elasticity.png'), integer=False)
make_plot(pd.Series(temp_margin), 'Price (Euro)', format_y=lambda y, _: '{:,.2f}'.format(y),
save=os.path.join(self.path_calibration, 'price.png'), integer=False)"""
# self.number_firms_insulation = 2
if self.number_firms_insulation is None:
if False and self.path_calibration is not None:
temp = {}
for n in range(1, 10, 2):
root, info_dict, ier, mess = fsolve(cournot_equilibrium, cost_renovation,
args=(n, cost_renovation),
full_output=True)
temp.update({n: root[0]})
make_plot(pd.Series(temp), 'Price (Euro)', format_y=lambda y, _: '{:,.2f}'.format(y),
save=os.path.join(self.path_calibration, 'price_cournot'))
mark_up_target = _supply['markup_insulation']
number_firms = fsolve(
lambda x: mark_up(cost_renovation * self._markup_insulation_store, u=expected_utility,
n=x) - mark_up_target, 3)[0]
self.number_firms_insulation = number_firms
self.logger.info('Number of firms insulation market: {:.1f}'.format(self.number_firms_insulation))
root, info_dict, ier, mess = fsolve(cournot_equilibrium,
cost_renovation * self._markup_insulation_store,
args=(
self.number_firms_insulation, cost_renovation, expected_utility),
full_output=True)
"""flow_renovation_before = (_renovation_rate * _stock).sum() / 10**3
cost_renovation_before = (_renovation_rate * _investment_insulation).sum() / _renovation_rate.sum()
print(flow_renovation_before)
print(cost_renovation)
print(self.number_firms_insulation)
proba = 1 / (1 + exp(-expected_utility))
elasticity = pref_investment * cost_renovation * _supply['markup_insulation'] / 1000 * (1 - proba)
elasticity_global = (elasticity * weight).sum()
print(elasticity_global)"""
mu = root[0] / cost_renovation
if mu != 0:
self._markup_insulation_store = mu
self.logger.info('Equilibrium insulation found. Markup: {:.2f}'.format(mu))
_market_share, _utility_intensive = to_market_share(_bill_saved, _subsidies_total,
_cost_total * self._markup_insulation_store,
_cost_financing=_cost_financing)
expected_utility = log(exp(_utility_intensive).sum(axis=1))
_renovation_rate = to_renovation_rate(expected_utility)
else:
self.logger.info('ERROR No Equilibrium. Keeping the same markup')
"""
for i in range(50):
print((1 + i / 10))
print(price_elasticity_demand_bis(cost_renovation * (1 + i / 10), u=utility))
print(mark_up(cost_renovation * (1 + i / 10), u=utility, n=self.number_firms_insulation))
"""
"""flow_renovation_after = (_renovation_rate * _stock).sum() / 10**3
cost_renovation_after = (_renovation_rate * _investment_insulation).sum() / _renovation_rate.sum()"""
return _market_share, _renovation_rate, _unobserved_value, expected_utility
def calibration_renovation(_stock, _cost_total, _bill_saved, _subsidies_total, _calib_renovation,
_cost_financing=None, _credit_constraint=None, _frequency_insulation=1):
def calibration(x, _stock, utilities, _ms_insulation_ini, _renovation_rate_ini, _target, _ref, proba,
_option='deviation'):
_constant_insulation = Series(x[:len(_ms_insulation_ini) - 1], index=_ms_insulation_ini.index.drop(_ref))
_constant_insulation = concat((_constant_insulation, Series(0, index=_ref)))
_constant_renovation = pd.Series(
x[len(_ms_insulation_ini) - 1:len(_renovation_rate_ini) + len(_ms_insulation_ini) - 1],
index=_renovation_rate_ini.index)
_scale = x[-1]
_utility_intensive = (utilities + _constant_insulation) * _scale
_ms = market_share_func(_utility_intensive)
expected_utility = log(exp(_utility_intensive).sum(axis=1))
_rate = renovation_func(expected_utility + reindex_mi(_constant_renovation, expected_utility.index), proba)
flow_renovation = _rate * _stock
f_replace = (_ms.T * flow_renovation).T
market_share_agg = (f_replace.sum() / f_replace.sum().sum()).reindex(_ms_insulation_ini.index)
rslt = market_share_agg - _ms_insulation_ini
rslt.drop(_ref, inplace=True)
flow_renovation_agg = flow_renovation.groupby(_renovation_rate_ini.index.names).sum()
renovation_rate_agg = flow_renovation_agg / _stock.groupby(_renovation_rate_ini.index.names).sum()
rslt = append(rslt, renovation_rate_agg - _renovation_rate_ini)
if _option == 'deviation':
renovation_mean = (_rate * _stock).sum() / _stock.sum()
std_deviation_calc = (((_rate - renovation_mean) ** 2 * _stock).sum() / (
_stock.sum() - 1)) ** (1 / 2)
rslt = append(rslt, _target - std_deviation_calc)
elif _option == 'ratio_min_max':
flow_renovation = self.add_certificate(flow_renovation).groupby('Performance').sum()
stock_performance = self.add_certificate(_stock).groupby('Performance').sum()
renovation_best = flow_renovation[[i for i in flow_renovation.index if i <= 'E']].sum() / stock_performance[[i for i in stock_performance.index if i <= 'E']].sum()
renovation_worst = flow_renovation[[i for i in flow_renovation.index if i >= 'F']].sum() / stock_performance[[i for i in stock_performance.index if i >= 'F']].sum()
factor = renovation_worst / renovation_best
rslt = append(rslt, _target - factor)
elif _option == 'share_fg':
flow_renovation = self.add_certificate(flow_renovation).groupby('Performance').sum()
renovation_worst = flow_renovation[[i for i in flow_renovation.index if i >= 'F']].sum()
factor = renovation_worst / flow_renovation.sum()
rslt = append(rslt, _target - factor)
elif _option == 'price_elasticity':
_ms_full = (_ms.T * _rate).T
_price_elasticity = self.preferences_insulation['cost'] * _scale * _cost_total / 1000 * (1 - _ms_full)
_price_elasticity = (_price_elasticity * _ms).sum(axis=1)
_price_elasticity_average = (_stock * _price_elasticity).sum() / _stock.sum()
rslt = append(rslt, _target - _price_elasticity_average)
else:
raise NotImplemented
return rslt
self.logger.info('Calibration renovation nested-logit function')
# target
renovation_rate_ini = _calib_renovation['renovation_rate_ini']
if self.no_friction is True:
renovation_rate_agg = (_stock * reindex_mi(renovation_rate_ini, _stock.index)).sum() / _stock.sum()
renovation_rate_ini = pd.Series(renovation_rate_agg, index=pd.Index([True], name='Existing'))
ms_insulation_ini = _calib_renovation['ms_insulation_ini']
option = _calib_renovation['scale']['option']
target = _calib_renovation['scale']['target']
_, utility_intensive, utility_bill_saving = to_market_share(_bill_saved, _subsidies_total, _cost_total,
_cost_financing=_cost_financing,
_credit_constraint=_credit_constraint,
full_output=True,
)
if _credit_constraint is not None:
_credit_constraint = _credit_constraint.astype(float)
_credit_constraint = _credit_constraint.replace(0, float('nan'))
utility_intensive *= _credit_constraint
utility_intensive.dropna(how='all', inplace=True)
# initialization
ref = MultiIndex.from_tuples([(False, False, True, False)], names=['Wall', 'Floor', 'Roof', 'Windows'])
constant_insulation_intensive = ms_insulation_ini.reindex(_subsidies_total.columns, axis=0).copy()
constant_insulation_intensive[constant_insulation_intensive > 0] = 0
constant_insulation_intensive.drop(ref, inplace=True)
constant_insulation_intensive = constant_insulation_intensive.to_numpy()
c = (_cost_total * ms_insulation_ini).sum(axis=1)
constant_renovation = - abs(self.preferences_insulation['cost'] * c / 1000).mean()
constant_renovation = Series(constant_renovation, index=renovation_rate_ini.index)
constant_renovation = constant_renovation.to_numpy()
x0 = append(constant_insulation_intensive, constant_renovation)
x0 = append(x0, 1)
root, info_dict, ier, mess = fsolve(calibration, x0, args=(
_stock, utility_intensive, ms_insulation_ini, renovation_rate_ini,
target, ref, _frequency_insulation, option), full_output=True)
if ier != 1:
raise ValueError('Renovation model did not converge')
constant_insulation_intensive = Series(root[:len(ms_insulation_ini) - 1], index=ms_insulation_ini.index.drop(ref))
constant_insulation_intensive = concat((constant_insulation_intensive, Series(0, index=ref)))
constant_insulation_intensive = constant_insulation_intensive.loc[ms_insulation_ini.index]
constant_renovation = pd.Series(
root[len(ms_insulation_ini) - 1:len(renovation_rate_ini) + len(ms_insulation_ini) - 1],
index=renovation_rate_ini.index)
scale = root[-1]
self.constant_insulation_intensive = constant_insulation_intensive * scale
self.constant_insulation_extensive = constant_renovation
self.apply_scale(scale, gest='insulation')
# results
_market_share, _renovation_rate, _, _ = apply_endogenous_renovation(_bill_saved, _subsidies_total,
_cost_total,
_stock=_stock,
_cost_financing=_cost_financing,
_frequency_insulation=_frequency_insulation,
_credit_constraint=_credit_constraint)
flow = _renovation_rate * _stock
rate = flow.groupby(renovation_rate_ini.index.names).sum() / _stock.groupby(
renovation_rate_ini.index.names).sum()
compare_rate = concat((rate.rename('Calculated'), renovation_rate_ini.rename('Observed')),
axis=1).round(3)
agg = flow.groupby(renovation_rate_ini.index.names).sum()
wtp = self.constant_insulation_extensive / self.preferences_insulation['cost']
stock_ini = _stock.groupby(renovation_rate_ini.index.names).sum()
details = concat((
self.constant_insulation_extensive, rate, renovation_rate_ini, stock_ini / 10 ** 6,
agg / 10 ** 3, wtp), axis=1, keys=['constant', 'calcul', 'observed', 'stock', 'flow', 'wtp']).round(decimals=3)
if self.path_calibration is not None:
details.to_csv(os.path.join(self.path_calibration, 'calibration_flow_renovation.csv'))
flow_insulation = (flow * _market_share.T).T.sum()
share = flow_insulation / flow_insulation.sum()
compare_share = concat((share.rename('Calculated'), ms_insulation_ini.rename('Observed')),
axis=1).round(3)
wtp = self.constant_insulation_intensive / self.preferences_insulation['cost']
details = concat(
(self.constant_insulation_intensive, share, ms_insulation_ini, flow_insulation / 10 ** 3, wtp), axis=1,
keys=['constant', 'calcul', 'observed', 'thousand', 'wtp']).round(decimals=3)
if self.path_calibration is not None:
details.to_csv(os.path.join(self.path_calibration, 'calibration_market_share_insulation.csv'))
compare = concat((compare_rate, compare_share), ignore_index=True)
if allclose(compare['Calculated'], compare['Observed'], rtol=10 ** -2):
self.logger.debug('Calibration investment decision insulation worked')
else:
assert allclose(compare['Calculated'], compare['Observed'],
rtol=10 ** -2), 'Calibration insulation did not work'
if self.path_calibration is not None and self.no_friction is False:
# export utility coefficient
cost = Series(self.preferences_insulation['cost'])
cost = cost.reset_index().set_axis(['Attribute', 'Value'], axis=1)
preference_benefits = self.preferences_insulation['bill_saved']
preference_benefits = preference_benefits.reset_index().set_axis(['Attribute', 'Value'], axis=1)
constant_insulation_extensive = self.constant_insulation_extensive.copy()
constant_insulation_extensive.index = constant_insulation_extensive.index.map(lambda x: '{} | {}'.format(x[0], x[1]))
constant_insulation_extensive = constant_insulation_extensive.reset_index().set_axis(['Attribute', 'Value'], axis=1)
constant_insulation_intensive = self.constant_insulation_intensive.copy()
names = constant_insulation_intensive.index.names
constant_insulation_intensive.index = constant_insulation_intensive.index.map(lambda row: ", ".join([name for name, value in zip(names, row) if value]))
constant_insulation_intensive = constant_insulation_intensive.reset_index().set_axis(['Attribute', 'Value'], axis=1)
temp = {'Cost': cost,
'Benefits': preference_benefits,
'Constant insulation': constant_insulation_extensive,
'Constant measure': constant_insulation_intensive}
temp = concat(temp, axis=0).droplevel(-1)
temp.to_csv(os.path.join(self.path_calibration, 'coefficient_utility_insulation.csv'))
# export hidden cost that result from calibration
temp = concat((self.hidden_cost, self.landlord_dilemma, self.multifamily_friction), axis=1,
keys=['Hidden cost', 'Landlord-tenant dilemma', 'Multi-family friction']).round(0)
temp.to_csv(os.path.join(self.path_calibration, 'parameters_friction_insulation.csv'))
# export hidden cost for each renovation work type that result from calibration
temp = self.hidden_cost_insulation.copy()
temp = temp + self.hidden_cost.loc[('Single-family', 'Owner-occupied')]
names = temp.index.names
temp.index = temp.index.map(lambda row: ", ".join([name for name, value in zip(names, row) if value]))
temp.to_csv(os.path.join(self.path_calibration, 'hidden_cost_insulation.csv'))
# TODO: Clean this part
ms_full = (_market_share.T * _renovation_rate).T
price_elasticity = self.preferences_insulation['cost'] * _cost_total / 1000 * (1 - ms_full)
price_elasticity = (price_elasticity * _market_share).sum(axis=1)
df = price_elasticity.groupby(['Housing type', 'Occupancy status', 'Income owner']).describe()
df.to_csv(os.path.join(self.path_calibration, 'price_elasticity_description.csv'))
temp = concat((price_elasticity, stock), axis=1, keys=['Price elasticity', 'Stock'])
make_hist(temp, 'Price elasticity', 'Housing type', 'Housing (Million)',
format_y=lambda y, _: '{:.0f}'.format(y / 10 ** 6), kde=True,
save=os.path.join(self.path_calibration, 'price_elasticity_insulation.png'))
compare.to_csv(os.path.join(self.path_calibration, 'result_calibration.csv'))
def test_sensitivity_ad_valorem(_stock, _cost_total, _bill_saved, _subsidies_total, _cost_financing,
_credit_constraint=credit_constraint):
# TODO: add average cost of renovation, consumption saved + add same graph for global renovation
rslt = dict()
for sub in range(0, 110, 10):
sub /= 100
_market_share, _renovation_rate, _, _ = apply_endogenous_renovation(_bill_saved,
(_cost_total + _cost_financing) * sub,
_cost_total,
_cost_financing=_cost_financing,
_credit_constraint=credit_constraint)
rslt.update({sub: (_renovation_rate * _stock).sum() / 10 ** 3})
if self.path_ini is not None:
make_plot(Series(rslt), 'Renovation function of ad valorem subsidy (Thousand households)',
save=os.path.join(self.path_ini, 'sensi_ad_valorem.png'), integer=False, legend=False)
def test_sensitivity(_stock, _cost_total, _bill_saved, _subsidies_total, _cost_financing,
_credit_constraint=credit_constraint):
# bill saved
result_bill_saved = dict()
result_bill_saved['Average cost (Thousand euro)'] = dict()
result_bill_saved['Flow renovation (Thousand)'] = dict()
values = [-0.5, -0.2, -0.1, 0, 0.1, 0.2, 0.5]
for v in values:
v += 1
_market_share, _renovation_rate, _, _ = apply_endogenous_renovation(_bill_saved * v, _subsidies_total,
_cost_total, _supply=None,
_cost_financing=_cost_financing,
_credit_constraint=credit_constraint)
f_renovation = _renovation_rate * _stock
f_replace = (f_renovation * _market_share.T).T
result_bill_saved['Average cost (Thousand euro)'].update(
{v: (f_replace * _cost_total).sum().sum() / f_renovation.sum() / 10 ** 3})
result_bill_saved['Flow renovation (Thousand)'].update({v: f_renovation.sum() / 10 ** 3})
result_bill_saved['Average cost (Thousand euro)'] = Series(
result_bill_saved['Average cost (Thousand euro)'])
result_bill_saved['Flow renovation (Thousand)'] = Series(result_bill_saved['Flow renovation (Thousand)'])
# subsidies
result_subsidies = dict()
result_subsidies['Average cost (Thousand euro)'] = dict()
result_subsidies['Flow renovation (Thousand)'] = dict()
values = [-0.5, -0.2, -0.1, 0, 0.1, 0.2, 0.5]
for v in values:
v += 1
_market_share, _renovation_rate, _, _ = apply_endogenous_renovation(_bill_saved, _subsidies_total * v,
_cost_total, _supply=None,
_cost_financing=_cost_financing,
_credit_constraint=credit_constraint)
f_renovation = _renovation_rate * _stock
f_replace = (f_renovation * _market_share.T).T
result_subsidies['Average cost (Thousand euro)'].update(
{v: (f_replace * _cost_total).sum().sum() / f_renovation.sum() / 10 ** 3})
result_subsidies['Flow renovation (Thousand)'].update({v: f_renovation.sum() / 10 ** 3})
result_subsidies['Average cost (Thousand euro)'] = Series(result_subsidies['Average cost (Thousand euro)'])
result_subsidies['Flow renovation (Thousand)'] = Series(result_subsidies['Flow renovation (Thousand)'])
# subsidies
result_cost = dict()
result_cost['Average cost (Thousand euro)'] = dict()
result_cost['Flow renovation (Thousand)'] = dict()
values = [-0.5, -0.2, -0.1, 0, 0.1, 0.2, 0.5]
for v in values:
v += 1
_market_share, _renovation_rate, _, _ = apply_endogenous_renovation(_bill_saved, _subsidies_total,
_cost_total * v, _supply=None,
_cost_financing=_cost_financing,
_credit_constraint=credit_constraint)
f_renovation = _renovation_rate * _stock
f_replace = (f_renovation * _market_share.T).T
result_cost['Average cost (Thousand euro)'].update(
{v: (f_replace * _cost_total).sum().sum() / f_renovation.sum() / 10 ** 3})
result_cost['Flow renovation (Thousand)'].update({v: f_renovation.sum() / 10 ** 3})
result_cost['Average cost (Thousand euro)'] = Series(result_cost['Average cost (Thousand euro)'])
result_cost['Flow renovation (Thousand)'] = Series(result_cost['Flow renovation (Thousand)'])
# result
dict_df = {'Bill saving': result_bill_saved['Average cost (Thousand euro)'],
'Cost': result_cost['Average cost (Thousand euro)'],
'Subsidies': result_subsidies['Average cost (Thousand euro)']
}
if self.path_ini is not None:
make_plots(dict_df, 'Average cost (Thousand euro)',
save=os.path.join(self.path_ini, 'sensi_average_cost.png'))
dict_df = {'Bill saving': result_bill_saved['Flow renovation (Thousand)'],
'Cost': result_cost['Flow renovation (Thousand)'],
'Subsidies': result_subsidies['Flow renovation (Thousand)']
}
if self.path_ini is not None:
make_plots(dict_df, 'Flow renovation (Thousand)',
save=os.path.join(self.path_ini, 'sensi_flow_renovation.png'))
def test_policies(_stock, _subsidies_details, _cost_total, _bill_saved, _subsidies_total, _cost_financing,
_credit_constraint=None):
"""Test freeriders and intensive margin of subsidies
Parameters
----------
_stock
_subsidies_details
_bill_saved
_subsidies_total
_cost_total
Returns
-------
"""
_market_share, _renovation_rate, _, _ = apply_endogenous_renovation(_bill_saved, _subsidies_total, _cost_total,
_supply=None, _cost_financing=_cost_financing,
_credit_constraint=credit_constraint)
f_retrofit = _renovation_rate * _stock
f_replace = (f_retrofit * _market_share.T).T
avg_cost_global = (f_replace * _cost_total).sum().sum() / f_retrofit.sum()
_subsidies_details = {k: item for k, item in _subsidies_details.items() if k != 'over_cap'}
rslt = dict()
for key, _sub in _subsidies_details.items():
_c = 0
mask = _sub > 0
eligible = (_sub > 0).any(axis=1)
if key == 'reduced_vat':
_c = _sub
_sub = 0
f_replace_eligible = f_replace.loc[eligible, :]
_eligible = f_replace_eligible.sum().sum()
_beneficiaries = f_replace[_sub > 0].sum().sum()
avg_cost_eligible = (f_replace_eligible * _cost_total).sum().sum() / f_replace_eligible.sum().sum()
if key == 'reduced_vat':
avg_sub = (f_replace_eligible * _c).sum().sum() / f_replace_eligible.sum().sum()
else:
avg_sub = (f_replace_eligible * _sub).sum().sum() / f_replace_eligible.sum().sum()
share_sub = avg_sub / avg_cost_eligible
c_total = _cost_total + _c
assert (c_total >= _cost_total).all().all(), 'Cost issue'
sub_total = _subsidies_total - _sub
assert (sub_total <= _subsidies_total).all().all(), 'Subsidies issue'
ms_nosub, renovation_nosub , _, _ = apply_endogenous_renovation(_bill_saved, sub_total, c_total, _supply=None,
_cost_financing=_cost_financing,
_credit_constraint=credit_constraint)
f_renovation_nosub = renovation_nosub * stock
f_replace_nosub = (f_renovation_nosub * ms_nosub.T).T
# avg_cost_global_sub = (f_replace_sub * _cost_total).sum().sum() / f_retrofit_sub.sum()
f_replace_eligible_nosub = f_replace_nosub.loc[eligible, :]
avg_cost_eligible_nosub = (f_replace_eligible_nosub * _cost_total).sum().sum() / f_replace_eligible_nosub.sum().sum()
_additional_participant = f_replace_eligible.sum().sum() - f_replace_eligible_nosub.sum().sum()
_additional_participant_share = _additional_participant / f_replace_eligible_nosub.sum().sum()
_non_additional_participant = f_replace_eligible_nosub.sum().sum()
_free_riders = _non_additional_participant / f_replace_eligible.sum().sum()
# _free_riders = f_retrofit_sub.sum() / f_retrofit.sum()
# _intensive_margin = (avg_cost_global - avg_cost_global_sub) / avg_cost_global_sub
_intensive_margin = (avg_cost_eligible - avg_cost_eligible_nosub) / avg_cost_eligible_nosub
rslt.update({key: Series(
[_eligible, _beneficiaries, avg_sub, share_sub,
_additional_participant, _non_additional_participant, _free_riders,
avg_cost_eligible, avg_cost_eligible_nosub, _intensive_margin],
index=['Eligible (hh)', 'Beneficiaries (hh)', 'Average sub (euro)', 'Share sub (%)',
'Additional participants (hh)', 'Non additional participants (hh)',
'Share of non additional participants (%)',
'Average cost eligible (EUR)', 'Average cost eligible wo sub (EUR)',
'Intensive margin benef (%)'])})
rslt = DataFrame(rslt)
if self.path_ini is not None:
rslt.to_csv(os.path.join(self.path_ini, 'result_policies_assessment.csv'))
make_sensitivity_tables(rslt, os.path.join(self.path_ini, 'result_policies_assessment.png'))
return rslt
def indicator_renovation_rate(_stock, _cost_insulation, _bill_saved, _subsidies_total, _cost_financing,
_credit_constraint=credit_constraint):
_market_share, _renovation_rate, _unobserved_value, _ = apply_endogenous_renovation(_bill_saved, _subsidies_total,
_cost_insulation,
_stock=_stock,
_cost_financing=_cost_financing,
_credit_constraint=credit_constraint)
discount_factor = 10
npv = - _cost_insulation - _cost_financing + _subsidies_total + (
reindex_mi(discount_factor, bill_saved.index) * _bill_saved.T).T
npv = (npv * _market_share).sum(axis=1)
level = ['Existing', 'Occupancy status', 'Income owner', 'Housing type', 'Wall', 'Floor', 'Roof', 'Windows',
'Heating system']
flow = (_renovation_rate * stock).groupby(level).sum()
rate = flow / stock.groupby(level).sum()
rate = self.add_certificate(rate)
npv = npv.groupby(level).mean()
npv = self.add_certificate(npv)
df = pd.concat((rate, npv), axis=1, keys=['rate', 'npv'])
temp = df.reset_index('Occupancy status').copy()
temp['Occupancy status'] = temp['Occupancy status'].apply(lambda x: self._resources_data['colors'][x])
make_scatter_plot(temp, 'npv', 'rate', 'Net present value (euro)', 'Renovation rate (%)',
col_colors='Occupancy status',
format_x=lambda y, _: '{:,.0f}'.format(y),
format_y=lambda y, _: '{:,.0%}'.format(y),
save=os.path.join(self.path_ini, 'renovation_rate_npv.png'), annotate=False)
dict_df = {i: df for i in df.index.names if i not in ['Existing', 'Wall', 'Floor', 'Roof', 'Windows']}
make_grouped_scatterplots(dict_df, 'npv', 'rate', colors=self._resources_data['colors'],
save=os.path.join(self.path_ini, 'renovation_rate_ini.png'), n_columns=2,
format_y=lambda y, _: '{:.0%}'.format(y))
temp = DataFrame()
for i in [i for i in rate.index.names if i not in ['Existing', 'Wall', 'Floor', 'Roof', 'Windows']]:
temp = concat((temp, rate.groupby(i).describe()), axis=0)
temp.round(4).to_csv(os.path.join(self.path_ini, 'renovation_rate_describe_ini.csv'))
rate = flow / stock.groupby(level).sum()
rate = self.add_certificate(rate)
flow = (_renovation_rate * stock * _market_share.T).T.groupby(level).sum()
for j in ['Wall', 'Floor', 'Roof', 'Windows']:
t = flow.xs(True, level=j, axis=1).sum(axis=1)
rate = t / stock.groupby(level).sum()
temp = DataFrame()
for i in [i for i in rate.index.names if i not in ['Existing', 'Wall', 'Floor', 'Roof', 'Windows']]:
temp = concat((temp, rate.groupby(i).describe()), axis=0)
temp.round(4).to_csv(os.path.join(self.path_ini, 'renovation_rate_describe_ini_{}.csv'.format(j.lower())))
flow = self.add_certificate(flow)
temp = flow.groupby('Performance').sum() / flow.sum()
def apply_regulation(idx_target, idx_replace, level):
"""Apply regulation by replacing utility specific constant.
Example removing the split incentive between landlord and tenant.
Parameters
----------
idx_target: str
Index to replace. Example: 'Privately rented'.
idx_replace: str
Index replaced by. Example: 'Owner-occupied'.
level: str
Level to look for index. Example: 'Occupancy status'
"""
_temp = self.constant_insulation_extensive.copy()
_temp = _temp.drop(_temp[_temp.index.get_level_values(level) == idx_target].index)
t = _temp[_temp.index.get_level_values(level) == idx_replace]
t.rename(index={idx_replace: idx_target}, inplace=True)
_temp = concat((_temp, t)).loc[self.constant_insulation_extensive.index]
self.constant_insulation_extensive = _temp.copy()
index = stock.index
cost_insulation = reindex_mi(cost_insulation, index)
if self.constant_insulation_intensive is None and self.rational_behavior_insulation is None:
calibration_renovation(stock, cost_insulation.copy(), bill_saved.copy(), subsidies_total.copy(),
calib_renovation,
_cost_financing=cost_financing.copy(),
_credit_constraint=credit_constraint,
_frequency_insulation=frequency_insulation)
if self.path_ini is not None and False:
indicator_renovation_rate(stock, cost_insulation, bill_saved, subsidies_total, cost_financing,
_credit_constraint=credit_constraint)
test_sensitivity_ad_valorem(stock, cost_insulation, bill_saved, subsidies_total, cost_financing,
_credit_constraint=credit_constraint)
test_sensitivity(stock, cost_insulation, bill_saved, subsidies_total, cost_financing,
_credit_constraint=credit_constraint)
test_policies(stock, subsidies_details, cost_insulation, bill_saved, subsidies_total, cost_financing,
_credit_constraint=credit_constraint)
regulation = [p for p in policies_insulation if p.policy == 'regulation']
if 'landlord_dilemma' in [p.name for p in regulation]:
apply_regulation('Privately rented', 'Owner-occupied', 'Occupancy status')
if 'multi_family' in [p.name for p in regulation]:
apply_regulation('Multi-family', 'Single-family', 'Housing type')
if 'present_bias' in [p.name for p in regulation]:
v = [p for p in regulation if p.name == 'present_bias'][0].value
discount_factor = (1 - (1 + v) ** -self.preferences_insulation['lifetime']) / v
idx = self.preferences_insulation['bill_saved'].index
self.preferences_insulation['bill_saved'] = Series(
discount_factor * abs(self.preferences_insulation['cost']),
index=idx)
market_share, renovation_rate, unobserved_value, consumer_surplus = apply_endogenous_renovation(bill_saved,
subsidies_total,
cost_insulation,
_stock=stock,
_cost_financing=cost_financing,
_supply=supply,
_credit_constraint=credit_constraint,
_frequency_insulation=frequency_insulation,
_min_performance=min_performance,
_health_cost_saved=health_cost_saved)
# positive means benefits so negative means hidden cost
hidden_cost = - (unobserved_value / abs(self.preferences_insulation['cost'])) * 1000
return renovation_rate, market_share, hidden_cost, consumer_surplus
[docs] def exogenous_renovation(self, stock, condition):
"""Format retrofit rate and market share for each segment.
Global retrofit and retrofit rate to match exogenous numbers.
Retrofit all heating system replacement dwelling.
Parameters
----------
stock: Series
condition: dict
Returns
-------
Series
Retrofit rate by segment.
DataFrame
Market-share by segment and insulation choice.
"""
if self.param_exogenous['target'] == 'heater_replacement':
retrofit_rate = Series(0, index=stock.index)
retrofit_rate[retrofit_rate.index.get_level_values('Heater replacement')] = 1
elif self.param_exogenous['target'] == 'worst':
consumption = reindex_mi(self._consumption_store['consumption'], stock.index)
temp = concat((consumption, stock), keys=['Consumption', 'Stock'], axis=1)
temp = temp.sort_values('Consumption', ascending=False)
temp['Stock'] = temp['Stock'].cumsum()
idx = temp[temp['Stock'] < self.param_exogenous['number']].index
retrofit_rate = Series(1, index=idx)
# segment to complete
to_complete = self.param_exogenous['number'] - stock[idx].sum()
idx = temp[temp['Stock'] >= self.param_exogenous['number']].index
if not idx.empty:
idx = idx[0]
to_complete = Series(to_complete / stock[idx],
index=MultiIndex.from_tuples([idx], names=temp.index.names))
retrofit_rate = concat((retrofit_rate, to_complete))
else:
raise NotImplemented
"""market_share = DataFrame(0, index=retrofit_rate.index, columns=choice_insulation)
market_share.loc[:, (True, True, True, True)] = 1"""
market_share = condition['mitigation_option'].astype(int)
market_share = reindex_mi(market_share, retrofit_rate.index).dropna()
assert market_share.loc[market_share.sum(axis=1) != 1].empty, 'Market share problem'
return retrofit_rate, market_share
[docs] def calculate_health_cost_saved(self, stock, health_cost):
s = stock.rename_axis(index={'Heating system': 'Heating system before'}).rename_axis(
index={'Heating system final': 'Heating system'})
_, certificate, _ = self.consumption_heating(index=s.index, method='3uses', full_output=True)
s = concat((s, reindex_mi(certificate, s.index).rename('Performance')), axis=1)
s.set_index('Performance', append=True, inplace=True)
s = s.squeeze()
s = self.add_level(s, self.stock_mobile, 'Income tenant')
health_cost_before = reindex_mi(health_cost, s.index)
health_cost_before = (health_cost_before * s).groupby(
[i for i in s.index.names if i != 'Income tenant']).sum() / s.groupby(
[i for i in s.index.names if i != 'Income tenant']).sum()
health_cost_before = health_cost_before.droplevel('Performance').rename_axis(
index={'Heating system': 'Heating system final'}).rename_axis(
index={'Heating system before': 'Heating system'})
s = s.droplevel('Performance').rename_axis(index={'Heating system': 'Heating system final'}).rename_axis(
index={'Heating system before': 'Heating system'})
_, _, certificate_after_3uses = self.prepare_consumption(self._choice_insulation,
index=s.index,
level_heater='Heating system final',
full_output=True,
method_epc='3uses')
c = reindex_mi(certificate_after_3uses, s.index)
health_cost_after = {}
for i, j in c.items():
df = concat((j, s), keys=['Performance', 'Stock'], axis=1).dropna().set_index(
'Performance', append=True).squeeze()
health_cost_after.update({i: reindex_mi(health_cost, df.index).droplevel('Performance')})
health_cost_after = DataFrame(health_cost_after).fillna(0)
health_cost_after.columns.names = certificate_after_3uses.columns.names
health_cost_after = (health_cost_after.T * s).T.groupby(
[i for i in s.index.names if i != 'Income tenant']).sum()
health_cost_after = (
health_cost_after.T / s.groupby([i for i in s.index.names if i != 'Income tenant']).sum()).T
health_cost_saved = (health_cost_before - health_cost_after.T).T
return health_cost_saved
[docs] def insulation_replacement(self, stock_ini, prices, cost_insulation_raw, frequency_insulation,
policies_insulation=None, financing_cost=None,
calib_renovation=None, min_performance=None,
exogenous_social=None, prices_before=None, supply=None, carbon_value=None,
carbon_content=None, calculate_condition=True, bill_rebate=0,
credit_constraint=True, health_cost=None, default_quality=None):
"""Calculate insulation retrofit in the dwelling stock.
1. Intensive margin
2. Extensive margin
Calibrate function first year.
Consumption saving only depends on insulation work.
However, certificate upgrade also consider if heating system have switched.
To reduce calculation time, we reduce the number of combination by grouping.
Cost, subsidies and constant depends on Housing type, Occupancy status, Housing type and Insulation performance.
Parameters
----------
stock_ini: Series
prices: Series
cost_insulation_raw: Series
€/m2 of losses area by component.
policies_insulation: list, optional
frequency_insulation: float or int
financing_cost: dict, optional
calib_renovation: dict, optional
min_performance: str, optional
exogenous_social: DataFrame, optional
prices_before: Series; optional
supply: dict, optional
carbon_value: float, optional
carbon_content: float, optional
calculate_condition: bool, optional
bill_rebate: float, optional
credit_constraint: bool, optional
Returns
-------
Series
Retrofit rate
DataFrame
Market-share insulation
"""
stock = self.add_certificate(stock_ini)
stock = stock[stock.index.get_level_values('Performance').astype(str) > 'B']
stock = stock.droplevel('Performance')
index = stock.index
performance_insulation = self._performance_insulation_renovation.copy()
if default_quality is not None:
performance_insulation = {k: i * (1 + default_quality) for k, i in performance_insulation.items()}
to_drop = select(stock, performance_insulation)
if not to_drop.empty:
index = index.drop(to_drop.index)
stock = stock.loc[index]
if not stock.empty:
# select index that can undertake insulation replacement
_, _, certificate_before_heater = self.consumption_heating_store(index, level_heater='Heating system')
# before include the change of heating system
consumption_std_before, consumption_3uses_before, certificate_before = self.consumption_heating_store(index, level_heater='Heating system final')
_consumption_std_before = consumption_std_before.copy()
# calculation of energy_saved_3uses after heating system final
consumption_std_after, consumption_3uses, certificate_after = self.prepare_consumption(
self._choice_insulation,
index=index,
level_heater='Heating system final',
default_quality=default_quality)
energy_saved_3uses = ((consumption_3uses_before - consumption_3uses.T) / consumption_3uses_before).T
energy_saved_3uses.dropna(inplace=True)
consumption_std_saved = (consumption_std_before - consumption_std_after.T).T
_consumption_std_saved = (reindex_mi(consumption_std_saved, index).T * reindex_mi(self._surface, index)).T
# energy bill before
consumption_std_before = reindex_mi(consumption_std_before, index) * reindex_mi(self._surface, index)
_consumption_std_before = consumption_std_before.copy()
energy_bill_before = self.energy_bill(prices, consumption_std_before, level_heater='Heating system final',
bill_rebate=bill_rebate)
consumption_std_before = self.add_attribute(consumption_std_before, 'Income tenant')
consumption_std_before = consumption_std_before.unstack(['Heater replacement', 'Heating system final'])
consumption_std_before = consumption_std_before.reorder_levels(self.stock.index.names)
index_consumption = consumption_std_before.index.intersection(self.stock.index)
consumption_std_before = consumption_std_before.loc[index_consumption]
consumption_std_before = consumption_std_before.stack(['Heater replacement', 'Heating system final'])
consumption_std_before = consumption_std_before.unstack('Income tenant')
consumption_std_before = consumption_std_before.reorder_levels(index.names)
consumption_std_before = consumption_std_before.stack('Income tenant')
index_consumption = consumption_std_before.index
heating_intensity_before = self.to_heating_intensity(consumption_std_before.index, prices,
consumption=consumption_std_before,
level_heater='Heating system final',
bill_rebate=bill_rebate)
consumption_std_after = self.prepare_consumption(self._choice_insulation, index=index,
level_heater='Heating system final', full_output=False,
default_quality=default_quality)
consumption_std_after = reindex_mi(consumption_std_after, index).reindex(self._choice_insulation, axis=1)
consumption_std_after = (consumption_std_after.T * reindex_mi(self._surface, index)).T
_consumption_std_after = consumption_std_after.copy()
energy_bill_after = self.energy_bill(prices, consumption_std_after, level_heater='Heating system final',
bill_rebate=bill_rebate)
bill_saved = - energy_bill_after.sub(energy_bill_before, axis=0).dropna()
bill_saved_std = bill_saved.copy()
bill_saved = self.add_attribute(bill_saved, 'Income tenant')
bill_saved = bill_saved.reorder_levels(index_consumption.names)
bill_saved = bill_saved.loc[index_consumption]
bill_saved = (bill_saved.T * heating_intensity_before).T
weights = self.add_level(stock, self.stock_mobile, 'Income tenant')
bill_saved = (bill_saved.T * weights).T
bill_saved = (bill_saved.groupby(index.names).sum().T / weights.groupby(index.names).sum()).T
consumption_saved = - _consumption_std_after.sub(_consumption_std_before, axis=0).dropna()
consumption_saved = self.add_attribute(consumption_saved, 'Income tenant')
consumption_saved = consumption_saved.reorder_levels(index_consumption.names)
consumption_saved = consumption_saved.loc[index_consumption]
consumption_saved = (consumption_saved.T * heating_intensity_before).T
weights = self.add_level(stock, self.stock_mobile, 'Income tenant')
consumption_saved = (consumption_saved.T * weights).T
consumption_saved = (consumption_saved.groupby(index.names).sum().T / weights.groupby(index.names).sum()).T
health_cost_saved = self.calculate_health_cost_saved(stock, health_cost)
cost_insulation = self.prepare_cost_insulation(cost_insulation_raw * self.surface_insulation)
cost_insulation = cost_insulation.T.multiply(self._surface, level='Housing type').T
surface = reindex_mi(self._surface, index)
cost_insulation, vat_insulation, tax, subsidies_details, subsidies_total, condition, eligible = self.apply_subsidies_insulation(
index, policies_insulation, cost_insulation, surface, certificate_after, certificate_before,
certificate_before_heater, energy_saved_3uses, consumption_saved, carbon_content,
calculate_condition=calculate_condition, health_cost_saved=health_cost_saved, bill_saved=bill_saved)
p = [p for p in policies_insulation if p.policy == 'zero_interest_loan']
for pp in p:
if pp.target is not None:
pp.target = condition[pp.target]
cost_total, cost_financing, amount_debt, amount_saving, discount, subsidies = self.calculate_financing(
reindex_mi(cost_insulation, index),
subsidies_total,
financing_cost,
policies=p
)
subsidies_loan = DataFrame(0, index=cost_total.index, columns=cost_total.columns)
for policy in p:
subsidies_details.update({policy.name: subsidies[policy.name]})
subsidies_loan += subsidies[policy.name]
p = [p for p in policies_insulation if p.policy == 'credit_constraint']
if credit_constraint and not p:
credit_constraint = self.credit_constraint(amount_debt, financing_cost, bill_saved=None)
else:
credit_constraint = None
cost_insulation_reindex = reindex_mi(cost_insulation, index)
assert allclose(amount_debt + amount_saving + subsidies_total, cost_insulation_reindex), 'Sum problem'
p = [p for p in policies_insulation if (p.policy == 'regulation') and (p.name == 'health_cost')]
if p:
health_cost_saved_subsidies = health_cost_saved.copy()
else:
health_cost_saved_subsidies = None
if self._endogenous:
bill_saved_expectation = bill_saved.copy()
if self._belief_engineering_calculation is True:
bill_saved_expectation = bill_saved_std.copy()
renovation_rate, market_share, hidden_cost, consumer_surplus = self.endogenous_renovation(
stock, bill_saved_expectation,
subsidies_total,
cost_insulation_reindex,
frequency_insulation,
policies_insulation,
calib_renovation=calib_renovation,
min_performance=min_performance,
subsidies_details=subsidies_details,
cost_financing=cost_financing,
supply=supply,
credit_constraint=credit_constraint,
health_cost_saved=health_cost_saved_subsidies)
self._renovation_store.update({'hidden_cost': hidden_cost})
self._renovation_store.update({'consumer_surplus': consumer_surplus})
if exogenous_social is not None:
index = renovation_rate[
renovation_rate.index.get_level_values('Occupancy status') == 'Social-housing'].index
s = self.add_certificate(stock[index])
renovation_rate_social = reindex_mi(exogenous_social.loc[:, self.year], s.index).droplevel(
'Performance')
renovation_rate.drop(index, inplace=True)
renovation_rate = concat((renovation_rate, renovation_rate_social), axis=0)
else:
renovation_rate, market_share = self.exogenous_renovation(stock, condition)
if self.year == self.first_year + 2 and self.no_friction is False:
distortion_grouped = self.calculate_distortion_insulation(index, stock, cost_insulation,
consumption_saved, health_cost_saved,
bill_saved, carbon_value, cap=True)
levels = [k for k in distortion_grouped.index.names if k != 'Technology']
subsidies_grouped = self.weighted_average(subsidies_total, stock.copy(), levels=levels,
technology='insulation')
s = self.add_certificate(stock)
s = self.add_energy(s)
stock_grouped = s.groupby([i for i in subsidies_grouped.index.names if i in s.index.names]).sum()
stock_grouped = stock_grouped[stock_grouped > 0]
temp = concat((subsidies_grouped, distortion_grouped), axis=1, keys=['Subsidies', 'Distortion'])
self._distortion_store.update({'insulation': temp.copy()})
if self.path_calibration is not None:
from project.utils import make_scatter_plot
temp = self._distortion_store['insulation'].copy()
temp = temp.reset_index()
col_colors = 'Income owner'
temp[col_colors] = temp[col_colors].apply(lambda x: self._resources_data['colors'][x])
col_colors = None
make_scatter_plot(temp, 'Subsidies', 'Distortion', 'Subsidies (Thousand euro)',
'Distortion (Thousand euro)',
annotate=False, save=os.path.join(self.path_calibration, 'subsidies_distortion_insulation.png'),
format_y=lambda y, _: '{:.0f}'.format(y/1e3),
format_x=lambda x, _: '{:.0f}'.format(x/1e3),
s=10, diagonal_line=True, col_colors=col_colors)
if self._condition_store is None and calculate_condition:
if prices_before is None:
prices_before = prices
surface = self._surface.xs(True, level='Existing')
consumption_before = self.add_attribute(_consumption_std_before, [i for i in surface.index.names if
i not in _consumption_std_before.index.names])
consumption_before = self.add_attribute(consumption_before, 'Income tenant')
heating_intensity = self.to_heating_intensity(consumption_before.index, prices,
consumption=consumption_before,
level_heater='Heating system final',
bill_rebate=bill_rebate)
consumption_before *= heating_intensity
consumption_after = self.add_attribute(_consumption_std_after, [i for i in surface.index.names if
i not in _consumption_std_after.index.names])
consumption_after = self.add_attribute(consumption_after, 'Income tenant')
heating_intensity_after = self.to_heating_intensity(consumption_after.index, prices_before,
consumption=consumption_after,
level_heater='Heating system final',
bill_rebate=bill_rebate)
consumption_saved_actual = (consumption_before - (consumption_after * heating_intensity_after).T).T
consumption_saved_no_rebound = (consumption_before - consumption_after.T * heating_intensity).T
self.store_information_insulation(condition, cost_insulation_raw, tax, cost_insulation, cost_financing,
vat_insulation, subsidies_details, subsidies_total, _consumption_std_saved,
consumption_saved_actual, consumption_saved_no_rebound,
amount_debt, amount_saving, discount, subsidies_loan, eligible)
return renovation_rate, market_share
else:
renovation_rate = Series(0, index=stock_ini.index)
market_share = DataFrame(0, index=stock_ini.index, columns=self._choice_insulation)
market_share.iloc[:, -1] = 1
return renovation_rate, market_share
[docs] def flow_retrofit(self, prices, cost_heater, cost_insulation, frequency_insulation,
policies_heater=None, policies_insulation=None, calib_heater=None, district_heating=None,
financing_cost=None, calib_renovation=None,
step=1, exogenous_social=None, premature_replacement=None, prices_before=None, supply=None,
carbon_value_kwh=None, carbon_value=None, bill_rebate=0, carbon_content=None,
health_cost=None, default_quality=None, credit_constraint=True):
"""Compute heater replacement and insulation retrofit.
1. Heater replacement based on current stock segment.
2. Knowing heater replacement (and new heating system) calculating retrofit rate by segment and market
share by segment.
3. Then, managing inflow and outflow.
Parameters
----------
prices: Series
cost_heater: Series
calib_heater: DataFrame
cost_insulation
policies_heater: list
Policies for heating system.
policies_insulation: list
Policies for insulation.
financing_cost: optional, dict
calib_renovation: dict, optional
Returns
-------
Series
"""
self.logger.info('Number of agents: {:,.0f}'.format(self.stock.shape[0]))
stock_mobile = self.stock_mobile.groupby(
[i for i in self.stock_mobile.index.names if i != 'Income tenant']).sum()
stock_mobile = stock_mobile.xs(True, level='Existing', drop_level=False)
# calculate average heating intensity
temp = concat((self.stock_mobile,
self.to_heating_intensity(self.stock_mobile.index, prices, level_heater='Heating system')),
axis=1, keys=['Stock', 'Heating intensity'])
self._heating_intensity_avg = temp['Stock'].mul(temp['Heating intensity']).sum() / temp['Stock'].sum()
# accounts for heater replacement - depends on energy prices, cost and policies heater
self.logger.info('Calculation heater replacement')
stock = self.heater_replacement(stock_mobile, prices, cost_heater, policies_heater,
calib_heater=calib_heater, step=1, financing_cost=financing_cost,
district_heating=district_heating, premature_replacement=premature_replacement,
prices_before=prices_before, bill_rebate=bill_rebate,
carbon_content=carbon_content, carbon_value=carbon_value,
credit_constraint=credit_constraint)
if supply is not None:
if supply['heater']:
self.logger.info('Solving supply demand equilibrium on heater market')
def marginal_cost_heater(cost_heater, demand, alpha, demand_ini):
return (cost_heater + alpha * (demand - demand_ini)).where(demand > demand_ini, cost_heater)
if self.cost_curve_heater is None:
demand = stock.xs(True, level='Heater replacement').groupby('Heating system final').sum()
def func(x, index):
alpha = Series(x[:index.shape[0]], index=index)
demand_ini = Series(x[index.shape[0]:], index=index)
y0 = (marginal_cost_heater(cost_heater, demand * 2, alpha, demand_ini) - cost_heater * 2).to_numpy()
y1 = (marginal_cost_heater(cost_heater, demand, alpha, demand_ini) - cost_heater).to_numpy()
return append(y0, y1)
index = demand.index
x0 = append(array([10] * index.shape[0]), demand.to_numpy())
root = fsolve(lambda x: func(x, index), x0)
alpha = Series(root[:index.shape[0]], index=index)
demand_ini = Series(root[index.shape[0]:], index=index)
self.cost_curve_heater = (alpha, demand_ini)
def supply_demand_heater_equilibrium(_prices_heater, _prices_index):
_prices_heater = pd.Series(_prices_heater, index=_prices_index)
_stock = self.heater_replacement(stock_mobile, prices, _prices_heater, policies_heater,
financing_cost=financing_cost,
district_heating=district_heating,
premature_replacement=premature_replacement,
)
demand = stock.xs(True, level='Heater replacement').groupby('Heating system final').sum()
price_supply = marginal_cost_heater(cost_heater, demand, self.cost_curve_heater[0], self.cost_curve_heater[1])
return price_supply - _prices_heater
prices_heater = fsolve(supply_demand_heater_equilibrium, cost_heater.to_numpy(),
args=(cost_heater.index), xtol=10**-1)
prices_heater = Series(prices_heater, index=cost_heater.index)
stock = self.heater_replacement(stock_mobile, prices, prices_heater, policies_heater,
calib_heater=calib_heater, step=1, financing_cost=financing_cost,
district_heating=district_heating, premature_replacement=premature_replacement,
prices_before=prices_before)
assert ~stock.index.duplicated().any(), 'Duplicated index after heater replacement'
self.logger.info('Number of agents that can insulate: {:,.0f}'.format(stock.shape[0]))
self.logger.info('Calculation insulation replacement')
# initialize calculate_condition
supply_insulation = None
if supply is not None:
supply_insulation = supply['insulation']
renovation_rate, market_share = self.insulation_replacement(stock, prices, cost_insulation,
frequency_insulation,
calib_renovation=calib_renovation,
policies_insulation=policies_insulation,
financing_cost=financing_cost,
exogenous_social=exogenous_social,
prices_before=prices_before,
supply=supply_insulation,
carbon_value=carbon_value_kwh,
carbon_content=carbon_content,
bill_rebate=bill_rebate,
health_cost=health_cost,
default_quality=default_quality,
credit_constraint=credit_constraint)
cost_curve_insulation = False
if cost_curve_insulation:
self.logger.info('Solving supply demand equilibrium on insulation market')
def marginal_cost_insulation(cost_insulation, demand, alpha):
return cost_insulation + alpha * demand
self.cost_curve_insulation = None
if self.cost_curve_insulation is None:
demand = (stock * renovation_rate * market_share.T).T.sum()
demand = pd.Series({i: demand.xs(True, level=i).sum() / 10 ** 3 for i in demand.index.names})
x0 = pd.Series([10, 10, 10, 10], index=['Wall', 'Floor', 'Roof', 'Windows'])
self.cost_curve_insulation = fsolve(lambda x: marginal_cost_insulation(cost_insulation, demand * 2, x) - cost_insulation * 2, x0)
def supply_demand_insulation_equilibrium(_prices_insulation):
_renovation_rate, _market_share = self.insulation_replacement(stock, prices, _prices_insulation,
policies_insulation=policies_insulation,
financing_cost=financing_cost,
exogenous_social=exogenous_social,
calculate_condition=True)
_demand = (stock * _renovation_rate * _market_share.T).T.sum()
_demand = pd.Series({i: _demand.xs(True, level=i).sum() / 10 ** 3 for i in _demand.index.names})
price_supply = marginal_cost_insulation(cost_insulation, _demand, self.cost_curve_insulation)
return price_supply - _prices_insulation
prices_insulation = fsolve(supply_demand_insulation_equilibrium, cost_insulation, xtol=10**-1)
prices_insulation = pd.Series(prices_insulation, index=cost_insulation.index)
renovation_rate, market_share = self.insulation_replacement(stock, prices, prices_insulation,
calib_renovation=calib_renovation,
policies_insulation=policies_insulation,
financing_cost=financing_cost,
exogenous_social=exogenous_social,
prices_before=prices_before,
supply=supply['insulation'],
carbon_value=carbon_value)
self.logger.info('Formatting and storing replacement')
renovation_rate = renovation_rate.reindex(stock.index).fillna(0)
factor = sum([(1 - renovation_rate) ** i for i in range(step)])
flow_insulation = renovation_rate * stock * factor
if step > 1:
# approximation
stock = self.heater_replacement(stock_mobile, prices, cost_heater, policies_heater,
calib_heater=calib_heater, step=step, financing_cost=financing_cost,
district_heating=district_heating, supply=supply)
flow_insulation = flow_insulation.where(flow_insulation < stock, stock)
flow_only_heater = stock - flow_insulation
assert (flow_only_heater >= 0).all().all(), 'Remaining stock is not positive'
flow_only_heater = flow_only_heater.xs(True, level='Heater replacement', drop_level=False).unstack(
'Heating system final')
flow_only_heater_sum = flow_only_heater.sum().sum()
# insulation upgrade
flow_insulation = flow_insulation[flow_insulation > 0]
replacement_sum = flow_insulation.sum().sum()
replaced_by = (flow_insulation * market_share.T).T
# hidden cost must be calculated now to separate the mandatory renovation flows.
if 'hidden_cost' in self._renovation_store.keys():
temp = add_no_renovation(replaced_by)
no_renovation = (stock * frequency_insulation - flow_insulation)[temp.index]
temp.iloc[:, 0] = no_renovation.copy()
self._renovation_store['hidden_cost_agg'] = self._renovation_store['hidden_cost'] * temp
self._renovation_store['no_renovation'] = no_renovation.sum()
assert round(replaced_by.sum().sum(), 0) == round(replacement_sum, 0), 'Sum problem'
# energy performance certificate jump due to heater replacement without insulation upgrade
only_heater = (stock - flow_insulation.reindex(stock.index, fill_value=0)).xs(True, level='Heater replacement')
# storing information (flow, investment, subsidies)
self.logger.debug('Store information retrofit')
self._replaced_by = replaced_by.copy()
self._only_heater = only_heater.copy()
# removing heater replacement level
replaced_by = replaced_by.groupby(
[c for c in replaced_by.index.names if c != 'Heater replacement']).sum()
flow_only_heater = flow_only_heater.groupby(
[c for c in flow_only_heater.index.names if c != 'Heater replacement']).sum()
# adding income tenant information
self.logger.debug('Adding income tenant')
replaced_by = self.add_level(replaced_by, self.stock_mobile, 'Income tenant')
assert round(replaced_by.sum().sum(), 0) == round(replacement_sum, 0), 'Sum problem'
flow_only_heater = self.add_level(flow_only_heater, self.stock_mobile, 'Income tenant')
assert round(flow_only_heater.sum().sum(), 0) == round(flow_only_heater_sum, 0), 'Sum problem'
self.logger.debug('Formatting switch heater flow')
flow_only_heater = flow_only_heater.stack('Heating system final')
to_replace_heater = - flow_only_heater.droplevel('Heating system final')
replaced_by_heater = flow_only_heater.droplevel('Heating system')
replaced_by_heater.index = replaced_by_heater.index.rename('Heating system', level='Heating system final')
replaced_by_heater = replaced_by_heater.reorder_levels(to_replace_heater.index.names)
flow_only_heater = concat((to_replace_heater, replaced_by_heater), axis=0)
flow_only_heater = flow_only_heater.groupby(flow_only_heater.index.names).sum()
assert round(flow_only_heater.sum(), 0) == 0, 'Sum problem'
self.logger.debug('Formatting renovation flow')
to_replace = replaced_by.droplevel('Heating system final').sum(axis=1).copy()
to_replace = to_replace.groupby(to_replace.index.names).sum()
assert round(to_replace.sum(), 0) == round(replacement_sum, 0), 'Sum problem'
self.logger.debug('Start frame to flow')
replaced_by = self.frame_to_flow(replaced_by, default_quality=default_quality)
self.logger.debug('End frame to flow')
self.logger.debug('Concatenate all flows')
to_replace = to_replace.reorder_levels(replaced_by.index.names)
flow_only_heater = flow_only_heater.reorder_levels(replaced_by.index.names)
flow_retrofit = concat((-to_replace, replaced_by, flow_only_heater), axis=0)
flow_retrofit = flow_retrofit.groupby(flow_retrofit.index.names).sum()
assert round(flow_retrofit.sum(), 0) == 0, 'Sum problem'
return flow_retrofit
[docs] def flow_obligation(self, policies_insulation, prices, cost_insulation, financing_cost=None,
frequency_insulation=1, health_cost=None, default_quality=None):
"""Account for flow obligation if defined in policies_insulation.
Parameters
----------
policies_insulation: list
Check if obligation.
prices: Series
cost_insulation: Series
financing_cost: dict, optional
frequency_insulation: float or int, default 1
health_cost: Series, optional
Returns
-------
flow_obligation: Series
"""
list_obligation = [p for p in policies_insulation if p.policy == 'obligation']
if list_obligation == []:
return None
self.logger.info('Calculation flow obligation')
stock = self.stock_mobile.copy()
# only work if there is one obligation
flows_obligation = []
for obligation in list_obligation:
self.logger.info('Flow obligation: {}'.format(obligation.name))
banned_performance = obligation.value.loc[self.year]
if not isinstance(banned_performance, str):
return None
performance_target = [i for i in self._resources_data['index']['Performance'] if i >= banned_performance]
stock_certificate = self.add_certificate(stock)
idx = stock.index[stock_certificate.index.get_level_values('Performance').isin(performance_target)]
if idx.empty:
return None
proba = 1
if obligation.frequency is not None:
proba = obligation.frequency
proba = reindex_mi(proba, idx)
to_replace = stock.loc[idx] * proba
to_replace = to_replace[to_replace > 0]
if obligation.target is not None:
target = reindex_mi(obligation.target, to_replace.index).fillna(False)
to_replace = to_replace.loc[target]
# formatting replace_by
replaced_by = to_replace.copy()
replaced_by = replaced_by.groupby([i for i in replaced_by.index.names if i != 'Income tenant']).sum()
if 'Heater replacement' not in replaced_by:
replaced_by = concat([replaced_by], keys=[False], names=['Heater replacement'])
if 'Heating system final' not in replaced_by.index.names:
temp = replaced_by.reset_index('Heating system')
temp['Heating system final'] = temp['Heating system']
replaced_by = temp.set_index(['Heating system', 'Heating system final'], append=True).squeeze()
replaced_by.index = replaced_by.index.reorder_levels(
['Heater replacement', 'Existing', 'Occupancy status', 'Income owner', 'Housing type', 'Wall', 'Floor',
'Roof', 'Windows', 'Heating system', 'Heating system final'])
# economic of switch to heat-pumps
_, market_share = self.insulation_replacement(replaced_by, prices, cost_insulation,
frequency_insulation,
policies_insulation=policies_insulation,
financing_cost=financing_cost,
min_performance=obligation.min_performance,
credit_constraint=False,
health_cost=health_cost)
if obligation.intensive == 'market_share':
# market_share endogenously calculated by insulation_replacement
pass
elif obligation.intensive == 'global':
market_share = DataFrame(0, index=replaced_by.index, columns=self._choice_insulation)
market_share.loc[:, (True, True, True, True)] = 1
elif obligation.intensive == 'cost':
print('ok')
else:
raise NotImplemented
assert ~market_share.isna().all(axis=1).any(), "Market-share issue"
assert (market_share.sum(axis=1).round(5) == 1.0).all(), "Market-share sum issue"
replaced_by = (replaced_by.rename(None) * market_share.T).T
replaced_by = replaced_by.fillna(0)
if 'hidden_cost' in self._renovation_store.keys():
temp = (self._renovation_store['hidden_cost'] * replaced_by).loc[replaced_by.index, :]
temp = temp.reindex(self._renovation_store['hidden_cost_agg'].index).fillna(0)
temp = temp.reindex(self._renovation_store['hidden_cost_agg'].columns, axis=1).fillna(0)
self._renovation_store['hidden_cost_agg'] += temp
self._replaced_by = self._replaced_by.add(replaced_by.copy(), fill_value=0)
replaced_by = self.frame_to_flow(replaced_by, default_quality=default_quality)
assert to_replace.sum().round(0) == replaced_by.sum().round(0), 'Sum problem'
self._flow_obligation.update({obligation.name.replace('_', ' ').capitalize(): to_replace.sum()})
flow_obligation = concat((- to_replace, replaced_by), axis=0)
flow_obligation = flow_obligation.groupby(flow_obligation.index.names).sum()
flows_obligation.append(flow_obligation)
return flows_obligation
[docs] def parse_output_run(self, prices, inputs, climate=None, step=1, taxes=None,
lifetime_insulation=30, social_discount_rate=0.032, bill_rebate=0,
coefficient_public_funding=0.2, duration_financing=10, discount_financing=0.05):
"""Parse output.
Renovation : envelope
Retrofit : envelope and/or heating system
Parameters
----------
prices: Series
Energy prices
inputs: dict
Exogenous data for post-treatment.
climate: int, optional
step: int, optional
taxes: list, optional
lifetime_insulation: int, optional
social_discount_rate: float, default 0.032
bill_rebate: float, optional
coefficient_public_funding: float, default 0.2
duration_financing: int, default 10
discount_financing: float, default 0.05
Returns
-------
dict
"""
import gc
carbon_value = inputs['carbon_value'].loc[self.year]
prices_wt = prices.copy()
if taxes is not None:
tax = sum([tax.value.loc[self.year, :] for tax in taxes if self.year in tax.value.index]).fillna(0)
prices_wt -= tax
stock = self.simplified_stock()
output = dict()
output['Stock (Million)'] = stock.sum() / 10 ** 6
output['Stock existing (Million)'] = self.stock.xs(True, level='Existing').sum() / 10 ** 6
stock_new = 0
if False in self.stock.index.get_level_values('Existing'):
stock_new = self.stock.xs(False, level='Existing').sum() / 10 ** 6
output['Stock new (Million)'] = stock_new
surface = self.stock * self.surface
output['Surface (Million m2)'] = surface.sum() / 10 ** 6
output['Surface existing (Million m2)'] = surface.xs(True, level='Existing').sum() / 10 ** 6
surface_new = 0
if False in self.stock.index.get_level_values('Existing'):
surface_new = surface.xs(False, level='Existing').sum() / 10 ** 6
output['Surface new (Million m2)'] = surface_new
surface = surface.groupby('Heating system').sum()
output.update({'Surface {} (Million m2)'.format(i): surface.loc[i] / 10 ** 6 for i in surface.index})
if 'population' in inputs.keys():
output['Surface (m2/person)'] = (
output['Surface (Million m2)'] / (inputs['population'].loc[self.year] / 10 ** 6))
temp = prices.copy().T
temp.index = temp.index.map(lambda x: 'Prices {} (euro/kWh)'.format(x))
output.update(temp.T)
output['Consumption standard (TWh)'] = self.consumption_agg(prices=prices, freq='year', climate=climate,
standard=True, agg='all')
output['Consumption standard (kWh/m2)'] = (output['Consumption standard (TWh)'] * 10 ** 9) / (
output['Surface (Million m2)'] * 10 ** 6)
consumption_energy = self.consumption_agg(prices=prices, freq='year', climate=None, standard=False,
agg='energy', bill_rebate=bill_rebate)
output['Consumption (TWh)'] = consumption_energy.sum()
self.store_over_years[self.year].update({'Consumption (TWh)': output['Consumption (TWh)']})
output['Consumption (kWh/m2)'] = (output['Consumption (TWh)'] * 10 ** 9) / (
output['Surface (Million m2)'] * 10 ** 6)
output['Consumption existing (TWh)'] = self.consumption_agg(prices=prices, freq='year', existing=True,
agg='all', bill_rebate=bill_rebate)
output['Consumption new (TWh)'] = output['Consumption (TWh)'] - output['Consumption existing (TWh)']
output['Consumption existing (kWh/m2)'] = (output['Consumption existing (TWh)'] * 10 ** 9) / (
output['Surface existing (Million m2)'] * 10 ** 6)
output['Consumption PE (TWh)'] = thermal.final2primary(consumption_energy, Series(consumption_energy.index,
index=consumption_energy.index)).sum()
if surface_new > 0:
output['Consumption new (kWh/m2)'] = (output['Consumption new (TWh)'] * 10 ** 9) / (
output['Surface new (Million m2)'] * 10 ** 6)
heating_intensity, budget_share = self.to_heating_intensity(self.stock.index, prices, bill_rebate=bill_rebate,
full_output=True)
condition_poverty = self.stock.index.get_level_values('Income tenant').isin(['D1', 'D2', 'D3', 'C1', 'C2']) & (
budget_share >= 0.08)
energy_poverty = self.stock[condition_poverty].sum()
output['Heating intensity (%)'] = (self.stock * heating_intensity).sum() / self.stock.sum()
output['Energy poverty (Million)'] = energy_poverty / 10 ** 6
if self.year == self.first_year:
# marginal utility of money is not considered
output['Space heating utility (Billion euro)'] = 0
else:
value = self.to_thermal_comfort(heating_intensity) / abs(self.preferences_insulation['cost']) * 1000
output['Space heating utility (Billion euro)'] = (reindex_mi(value, self.stock.index) * self.stock).sum() / 1e9
"""
x = self.to_thermal_comfort(heating_intensity) / abs(self.preferences_insulation['cost']) * 1000
y = concat((heating_intensity, x), axis=1, keys=['Heating intensity', 'Thermal comfort'])
# scatter plot heating intensity
make_scatter_plot(y, 'Heating intensity', 'Thermal comfort', 'Heating intensity (%)',
'Thermal comfort (euro)',
annotate=False,
save=os.path.join(self.path_calibration, 'thermal_comfort.png'),
format_y=lambda y, _: '{:.0f}'.format(y),
format_x=lambda x, _: '{:.0%}'.format(x),
s=10)"""
temp = consumption_energy.copy()
temp.index = temp.index.map(lambda x: 'Consumption {} (TWh)'.format(x))
output.update(temp.T)
temp = self.consumption_agg(prices=prices, freq='year', climate=None, standard=False,
agg='heater', bill_rebate=bill_rebate).dropna()
consumption_hp = sum([temp.loc[i] for i in self._resources_data['index']['Heat pumps'] if i in temp.index])
temp.index = temp.index.map(lambda x: 'Consumption {} (TWh)'.format(x))
output.update(temp.T)
output.update({'Consumption Heat pump (TWh)': consumption_hp})
output.update({'Consumption Direct electric (TWh)': output['Consumption Electricity-Direct electric (TWh)']})
output.update({'Consumption District heating (TWh)': output['Consumption Heating (TWh)']})
consumption_energy_climate = None
if climate is not None:
consumption_energy_climate = self.consumption_agg(prices=prices, freq='year', climate=climate,
standard=False, agg='energy', bill_rebate=bill_rebate)
output['Consumption climate (TWh)'] = consumption_energy_climate.sum()
temp = consumption_energy_climate.copy()
temp.index = temp.index.map(lambda x: 'Consumption {} climate (TWh)'.format(x))
output.update(temp.T)
output['Factor climate (%)'] = output['Consumption climate (TWh)'] / output['Consumption (TWh)']
# hourly resolution energy consumption
if False:
consumption_hourly = self.consumption_agg(prices=prices, freq='hour', standard=False, climate=2006,
efficiency_hour=True, hourly_profile='power')
# format_x datetime hourly
temp = consumption_hourly.loc['Electricity']
make_plot(temp, 'Consumption electricity (GWh/h)',
save=os.path.join(self.path, 'consumption_hour.png'),
format_y=lambda y, _: '{:.0f}'.format(y / 1e6), integer=False, legend=False)
# only select a day in february
temp = consumption_hourly.loc['Electricity'].loc['2006-02-01']
make_plot(temp, 'Consumption electricity (GWh/h)',
save=os.path.join(self.path, 'consumption_day.png'),
format_y=lambda y, _: '{:.0f}'.format(y / 1e6), integer=False, legend=False)
consumption = self.consumption_actual(prices, bill_rebate=bill_rebate) * self.stock
coefficient_backup = reindex_mi(self.coefficient_backup, consumption.index)
# correct that do consider secondary heating system
temp = consumption.groupby('Existing').sum()
temp.rename(index={True: 'Existing', False: 'New'}, inplace=True)
temp.index = temp.index.map(lambda x: 'Consumption {} (TWh)'.format(x))
output.update(temp.T / 10 ** 9)
temp = consumption.groupby(self.certificate).sum()
temp.index = temp.index.map(lambda x: 'Consumption {} (TWh)'.format(x))
output.update(temp.T / 10 ** 9)
temp = self.consumption_agg(agg='heater', standard=True, freq='year')
temp.index = temp.index.map(lambda x: 'Consumption standard {} (TWh)'.format(x))
output.update(temp.T)
emission = inputs['carbon_emission'].loc[self.year:, :].copy()
if inputs.get('renewable_gas') is not None:
renewable_gas = inputs['renewable_gas'].loc[self.year]
if renewable_gas < consumption_energy.loc['Natural gas']:
temp = (consumption_energy.loc['Natural gas'] - renewable_gas) / consumption_energy.loc['Natural gas'] * emission.loc[self.year, 'Natural gas']
else:
temp = 0
emission.loc[self.year, 'Natural gas'] = temp
self.store_over_years[self.year].update({'Emission content (gCO2/kWh)': emission.loc[self.year, :]})
temp = emission.loc[self.year, :]
temp.index = temp.index.map(lambda x: 'Emission content {} (gCO2/kWh)'.format(x))
output.update(temp.T)
emission = emission.loc[self.year, :]
carbon_value_kwh = carbon_value * emission / 10**6
output['Emission mean (gCO2/kWh)'] = (consumption_energy * emission).sum() / consumption_energy.sum()
temp = consumption_energy * emission
output['Emission (MtCO2)'] = temp.sum() / 10 ** 3
self.store_over_years[self.year].update({'Emission (MtCO2)': output['Emission (MtCO2)']})
temp.index = temp.index.map(lambda x: 'Emission {} (MtCO2)'.format(x))
output.update(temp.T / 10 ** 3)
emission_start = inputs['carbon_emission'].loc[self.first_year, :]
output['Emission mean first year (gCO2/kWh)'] = (consumption_energy * emission_start).sum() / consumption_energy.sum()
output['Emission first year (MtCO2)'] = (consumption_energy * emission_start).sum() / 10 ** 3
if consumption_energy_climate is not None:
temp = consumption_energy_climate * emission
output['Emission climate (MtCO2)'] = temp.sum() / 10 ** 3
temp.index = temp.index.map(lambda x: 'Emission climate {} (MtCO2)'.format(x))
output.update(temp.T / 10 ** 3)
emission_reindex = emission.reindex(self.energy).set_axis(self.stock.index, axis=0)
emission_stock = consumption * coefficient_backup * emission_reindex
emission_stock += consumption * (1 - coefficient_backup) * emission.loc['Wood fuel']
temp = emission_stock.groupby('Existing').sum()
temp.rename(index={True: 'Existing', False: 'New'}, inplace=True)
temp.index = temp.index.map(lambda x: 'Emission {} (MtCO2)'.format(x))
output.update(temp.T / 10 ** 12)
temp = self.stock.groupby(self.certificate).sum()
temp.index = temp.index.map(lambda x: 'Stock {} (Million)'.format(x))
output.update(temp.T / 10 ** 6)
output['Stock efficient (Million)'] = output.get('Stock A (Million)', 0) + output.get('Stock B (Million)', 0)
output['Stock low-efficient (Million)'] = output.get('Stock G (Million)', 0) + output.get('Stock F (Million)', 0)
output['Stock to renovate (Million)'] = output['Stock low-efficient (Million)'] + output.get('Stock E (Million)', 0) + \
output.get('Stock D (Million)', 0)
temp = self.stock.groupby('Heating system').sum()
output['Stock Direct electric (Million)'] = temp['Electricity-Direct electric'] / 10 ** 6
output['Stock Heat pump (Million)'] = temp['Electricity-Heat pump water'] / 10 ** 6
if 'Electricity-Heat pump air' in temp.keys():
output['Stock Heat pump (Million)'] += temp['Electricity-Heat pump air'] / 10 ** 6
heater = {
'Oil fuel': ['Oil fuel-Performance boiler', 'Oil fuel-Standard boiler', 'Oil fuel-Collective boiler'],
'Wood fuel': ['Wood fuel-Performance boiler', 'Wood fuel-Standard boiler'],
'Natural gas': ['Natural gas-Performance boiler', 'Natural gas-Standard boiler',
'Natural gas-Collective boiler'],
'District heating': ['Heating-District heating']
}
for key, item in heater.items():
output['Stock {} (Million)'.format(key)] = temp[[i for i in item if i in temp.index]].sum() / 10 ** 6
temp.index = temp.index.map(lambda x: 'Stock {} (Million)'.format(x))
output.update(temp.T / 10 ** 6)
# energy expenditures considering back-up cost
prices_reindex = prices.reindex(self.energy).set_axis(self.stock.index, axis=0)
energy_expenditure = consumption * coefficient_backup * prices_reindex
energy_expenditure += consumption * (1 - coefficient_backup) * prices.loc['Wood fuel']
temp = energy_expenditure.groupby('Income tenant').sum()
temp.index = temp.index.map(lambda x: 'Energy expenditures {} (Billion euro)'.format(x))
output.update(temp.T / 10 ** 9)
output['Energy expenditures (Billion euro)'] = energy_expenditure.sum() / 10 ** 9
prices_wt_reindex = prices_wt.reindex(self.energy).set_axis(self.stock.index, axis=0)
energy_expenditure = consumption * coefficient_backup * prices_wt_reindex
energy_expenditure += consumption * (1 - coefficient_backup) * prices.loc['Wood fuel']
output['Energy expenditures wt (Billion euro)'] = energy_expenditure.sum() / 10 ** 9
# taxes expenditures
if taxes is not None:
taxes_expenditures = dict()
total_taxes = Series(0, index=prices.index)
for tax in taxes:
if self.year in tax.value.index:
if tax.name not in self.taxes_list:
self.taxes_list += [tax.name]
amount = tax.value.loc[self.year, :] * consumption_energy
taxes_expenditures[tax.name] = amount
total_taxes += amount
taxes_expenditures = DataFrame(taxes_expenditures).sum()
self.taxes_revenues.update({self.year: taxes_expenditures.copy()})
taxes_expenditures.index = taxes_expenditures.index.map(
lambda x: '{} (Billion euro)'.format(x.capitalize().replace('_', ' ').replace('Cee', 'Cee tax')))
output.update((taxes_expenditures / step).T)
output['Taxes expenditure (Billion euro)'] = taxes_expenditures.sum() / step
output['Carbon value (Billion euro)'] = (consumption_energy * carbon_value_kwh).sum()
output['Health cost (Billion euro)'] = self.health_cost(inputs['health_cost_dpe'], inputs['health_cost_income'], prices)
output['Health expenditure (Billion euro)'] = 0 # temp['Health expenditure (Billion euro)']
self.store_over_years[self.year].update({'Health cost (Billion euro)': output['Health cost (Billion euro)']})
# output.update(o)
if self.year > self.first_year:
levels = [i for i in self._replaced_by.index.names if
i not in ['Heater replacement', 'Heating system final']]
names = ['Heater replacement', 'Existing', 'Occupancy status', 'Income owner', 'Housing type', 'Wall',
'Floor', 'Roof', 'Windows', 'Heating system', 'Heating system final']
self._replaced_by.index = self._replaced_by.index.reorder_levels(names)
self._renovation_store['discount'] = self._renovation_store['discount'].groupby(levels).mean()
levels_owner = ['Occupancy status', 'Income owner']
replaced_by_grouped = self._replaced_by.groupby(levels).sum()
# cost of epc transition
if False:
replacement = self._replaced_by.copy().groupby(['Housing type', 'Wall', 'Floor', 'Roof', 'Windows', 'Heating system']).sum()
cost = (self._renovation_store['cost_households'].T / self._surface).T
cost = cost.xs(False, level='Existing').xs('Owner-occupied', level='Occupancy status')
cost = reindex_mi(cost, replacement.index)
consumption_before, certificate_before, _ = self.consumption_heating(index=replacement.index, method='3uses',
full_output=True)
s = concat([Series(index=replacement.index, dtype=float)] * len(replacement.columns), axis=1).set_axis(replacement.columns, axis=1)
# choice_insulation = choice_insulation.drop(no_insulation) # only for
s.index.rename(
{'Wall': 'Wall before', 'Floor': 'Floor before', 'Roof': 'Roof before', 'Windows': 'Windows before'},
inplace=True)
temp = s.fillna(0).stack(s.columns.names)
temp = temp.reset_index().drop(0, axis=1)
for i in ['Wall', 'Floor', 'Roof', 'Windows']:
# keep the info to unstack later
temp.loc[:, '{} bool'.format(i)] = temp.loc[:, i]
temp.loc[temp[i], i] = self._performance_insulation_renovation[i]
temp.loc[temp[i] == False, i] = temp.loc[temp[i] == False, '{} before'.format(i)]
temp = temp.astype(
{'Housing type': 'string', 'Wall': 'float', 'Floor': 'float', 'Roof': 'float', 'Windows': 'float',
'Heating system': 'string'})
index = MultiIndex.from_frame(temp)
consumption_after, certificate_after, _ = self.consumption_heating(index=index, method='3uses', full_output=True)
certificate_after = reindex_mi(certificate_after, index).droplevel(['Wall', 'Floor', 'Roof', 'Windows']).unstack(
['{} bool'.format(i) for i in ['Wall', 'Floor', 'Roof', 'Windows']])
certificate_after.index.rename({'Wall before': 'Wall', 'Floor before': 'Floor', 'Roof before': 'Roof',
'Windows before': 'Windows'}, inplace=True)
certificate_after.columns.rename(
{'Wall bool': 'Wall', 'Floor bool': 'Floor', 'Roof bool': 'Roof', 'Windows bool': 'Windows'},
inplace=True)
consumption_after = reindex_mi(consumption_after, index).droplevel(['Wall', 'Floor', 'Roof', 'Windows']).unstack(
['{} bool'.format(i) for i in ['Wall', 'Floor', 'Roof', 'Windows']])
consumption_after.index.rename({'Wall before': 'Wall', 'Floor before': 'Floor', 'Roof before': 'Roof',
'Windows before': 'Windows'}, inplace=True)
consumption_after.columns.rename(
{'Wall bool': 'Wall', 'Floor bool': 'Floor', 'Roof bool': 'Roof', 'Windows bool': 'Windows'},
inplace=True)
# calculate the cost of the transition
rename_insulation = {'Wall': 'Wall insulation', 'Floor': 'Floor insulation', 'Roof': 'Roof insulation',
'Windows': 'Windows insulation'}
replacement.columns.rename(rename_insulation, inplace=True)
replacement = replacement.stack(replacement.columns.names)
replacement = concat((replacement, reindex_mi(certificate_before, replacement.index)), axis=1,
keys=['replacement', 'certificate_before'])
replacement = concat(
(replacement, reindex_mi(consumption_before, replacement.index).rename('consumption_before')), axis=1)
cost.columns.rename(rename_insulation, inplace=True)
cost = cost.stack(cost.columns.names)
replacement = concat((replacement, cost.rename('Cost')), axis=1)
consumption_after.columns.rename(rename_insulation, inplace=True)
consumption_after = consumption_after.stack(consumption_after.columns.names)
replacement = concat((replacement, consumption_after.rename('consumption_after')), axis=1)
certificate_after.columns.rename(rename_insulation, inplace=True)
certificate_after = certificate_after.stack(certificate_after.columns.names)
replacement = concat((replacement, certificate_after.rename('certificate_after')), axis=1)
replacement.set_index(['certificate_before', 'certificate_after'], append=True, inplace=True)
replacement['consumption_saving'] = replacement['consumption_before'] - replacement['consumption_after']
# weighted cost of the transition groupby certificate_before and certificate_after
cost_transition = replacement.groupby(['certificate_before', 'certificate_after']).apply(
lambda x: (x['replacement'] * x['Cost']).sum() / x['replacement'].sum())
cost_transition = cost_transition.unstack('certificate_after')
replacement_transition = replacement.groupby(['certificate_before', 'certificate_after'])['replacement'].sum()
replacement_transition = replacement_transition.unstack('certificate_after')
if self.path_ini:
replacement.to_csv(os.path.join(self.path_ini, 'replacement.csv'))
cost_transition.to_csv(os.path.join(self.path_ini, 'cost_transition.csv'))
replacement_transition.to_csv(os.path.join(self.path_ini, 'replacement_transition.csv'))
# consumption saving
if self.consumption_before_retrofit is not None:
consumption_before_retrofit = self.consumption_before_retrofit
consumption_after_retrofit = self.store_consumption(prices, emission, bill_rebate=bill_rebate)
temp = {'{} saving (TWh/year)'.format(k.split(' (TWh)')[0]): consumption_before_retrofit[k] -
consumption_after_retrofit[k]
for k in consumption_before_retrofit.keys() if 'TWh' in k}
output.update(temp)
temp = {'{} saving (MtCO2/year)'.format(k.split(' (MtCO2)')[0]): consumption_before_retrofit[k] -
consumption_after_retrofit[k]
for k in consumption_before_retrofit.keys() if 'MtCO2' in k}
output.update(temp)
consumption_saved_insulation = (
self._replaced_by * self._renovation_store['consumption_saved_households']).groupby(
levels).sum()
temp = self.add_level(self._replaced_by.fillna(0), self._stock_ref, 'Income tenant')
consumption_saved_actual_insulation = (
temp * reindex_mi(self._renovation_store['consumption_saved_actual_households'],
temp.index)).groupby(levels + ['Income tenant']).sum()
consumption_saved_no_rebound_insulation = (
temp * reindex_mi(self._renovation_store['consumption_saved_no_rebound_households'],
temp.index)).groupby(levels + ['Income tenant']).sum()
rebound_insulation = (consumption_saved_no_rebound_insulation - consumption_saved_actual_insulation).sum(
axis=1)
rebound_insulation = rebound_insulation.groupby(self.to_energy(rebound_insulation)).sum()
output.update({'Consumption standard saving insulation (TWh/year)': consumption_saved_insulation.sum().sum() / 10 ** 9})
_consumption_saved_actual_insulation = self.apply_calibration(consumption_saved_actual_insulation.sum(axis=1))
output.update({'Consumption saving insulation (TWh/year)': _consumption_saved_actual_insulation.sum() / 10 ** 9})
_consumption_saved_no_rebound_insulation = self.apply_calibration(consumption_saved_no_rebound_insulation.sum(axis=1))
output.update({'Consumption saving no rebound insulation (TWh/year)': _consumption_saved_no_rebound_insulation.sum() / 10 ** 9})
output.update({'Rebound insulation (TWh/year)': rebound_insulation.sum() / 10 ** 9})
output['Realization rate (% standard)'] = output['Consumption saving insulation (TWh/year)'] / output[
'Consumption standard saving insulation (TWh/year)']
output['Rebound insulation (% performance gap)'] = output['Rebound insulation (TWh/year)'] / (output[
'Consumption standard saving insulation (TWh/year)'] - output['Consumption saving insulation (TWh/year)'])
temp = consumption_saved_insulation.sum(axis=1)
output.update(
{'Emission standard saving insulation (MtCO2/year)': self.to_emission(temp, emission).sum() / 10 ** 12})
temp = _consumption_saved_actual_insulation.copy()
output.update(
{'Emission saving insulation (MtCO2/year)': self.to_emission(temp, emission).sum() / 10 ** 12})
consumption = self.consumption_heating_store(self._renovation_store['consumption_saved_households'].index,
full_output=False, level_heater='Heating system final')
consumption = reindex_mi(consumption, self._renovation_store['consumption_saved_households'].index)
consumption *= reindex_mi(self._surface, consumption.index)
consumption_saved = (self._renovation_store['consumption_saved_households'].T / consumption).T
assert (consumption_saved <= 1).all().all(), 'Percent issue'
consumption_saved_mean = (consumption_saved * self._replaced_by).sum().sum() / self._replaced_by.sum().sum()
output.update({'Consumption standard saving insulation (%)': consumption_saved_mean})
consumption_saved_decision = (consumption_saved * self._replaced_by).sum(
axis=1).groupby(['Housing type', 'Occupancy status']).sum() / self._replaced_by.sum(axis=1).groupby(
['Housing type', 'Occupancy status']).sum()
temp = consumption_saved_decision
temp.index = temp.index.map(lambda x: 'Consumption standard saving {} - {} (%)'.format(x[0], x[1]))
output.update(temp.T)
output.update(
{'Consumption standard saving heater (TWh/year)': self._heater_store['consumption_saved'] / 10 ** 9})
output.update({'Consumption saving heater (TWh/year)': self._heater_store[
'consumption_saved_actual'].sum() / 10 ** 9})
output.update({'Consumption saving no rebound heater (TWh/year)': self._heater_store[
'consumption_saved_no_rebound'].sum() / 10 ** 9})
output.update(
{'Emission saving heater (MtCO2/year)': self.to_emission(self._heater_store['consumption_saved_actual'], emission).sum() / 10 ** 12})
rebound_heater = self._heater_store['rebound']
output['Rebound heater (TWh/year)'] = rebound_heater.sum() / 10 ** 9
rebound_ee = rebound_insulation + rebound_heater
output['Rebound EE (TWh/year)'] = rebound_ee.sum() / 10 ** 9
thermal_comfort = rebound_ee * prices
rebound_insulation.index = rebound_insulation.index.map(
lambda x: 'Rebound insulation {} (TWh/year)'.format(x))
output.update(rebound_insulation.T / 10 ** 9)
thermal_comfort.index = thermal_comfort.index.map(
lambda x: 'Thermal comfort {} (Billion euro/year)'.format(x))
output.update({'Thermal comfort EE (Billion euro)': thermal_comfort.sum() / 10 ** 9})
output.update(thermal_comfort.T / 10 ** 9)
rebound_heater.index = rebound_heater.index.map(lambda x: 'Rebound heater {} (TWh/year)'.format(x))
output.update(rebound_heater.T / 10 ** 9)
consumption_saved_price_constant = _consumption_saved_actual_insulation + self._heater_store['consumption_saved_actual']
output['Consumption saving prices constant (TWh/year)'] = consumption_saved_price_constant.sum() / 10 ** 9
output['Consumption saving prices effect (TWh/year)'] = round(
output['Consumption saving (TWh/year)'] - output['Consumption saving prices constant (TWh/year)'], 3)
if self.year - 1 in self.store_over_years.keys():
consumption_saving = self.store_over_years[self.year - 1]['Consumption (TWh)'] - output['Consumption (TWh)']
output['Consumption saving total (TWh/year)'] = consumption_saving
output['Consumption saving total (%/year)'] = consumption_saving / self.store_over_years[self.year - 1]['Consumption (TWh)']
output['Consumption saving natural replacement (TWh/year)'] = consumption_saving - output['Consumption saving (TWh/year)']
emission_saving = self.store_over_years[self.year - 1]['Emission (MtCO2)'] - output['Emission (MtCO2)']
output['Emission saving total (TWh/year)'] = emission_saving
output['Emission saving total (%/year)'] = emission_saving / self.store_over_years[self.year - 1]['Emission (MtCO2)']
output['Emission saving natural replacement (MtCO2/year)'] = emission_saving - output['Emission saving (MtCO2/year)']
consumption_saving_prices = Series({i: (output['Consumption {} saving (TWh/year)'.format(i)] * 10**9 - consumption_saved_price_constant.loc[i]) for i in consumption_saved_price_constant.keys()})
output['Emission saving prices (MtCO2/year)'] = (consumption_saving_prices * emission).sum() / 10**12
output['Emission saving carbon content (MtCO2/year)'] = output['Emission saving (MtCO2/year)'] - output['Emission saving prices (MtCO2/year)'] - output['Emission saving heater (MtCO2/year)'] - output['Emission saving insulation (MtCO2/year)']
prices_effect = pd.Series({i: output['Consumption {} saving (TWh/year)'.format(i)] -
consumption_saved_price_constant.loc[i] / 10 ** 9 for i in
consumption_saved_price_constant.index})
thermal_loss = prices_effect * prices
output.update({'Thermal loss prices (Billion euro)': round(thermal_loss.sum(), 3)})
consumption_saving_prices = prices_effect * prices_wt
output.update({'Consumption saving prices (Billion euro)': round(consumption_saving_prices.sum(), 3)})
# retrofit and renovation
renovation = replaced_by_grouped.sum().sum()
output['Retrofit (Thousand households)'] = (renovation + self._only_heater.sum()) / 10 ** 3 / step
output['Renovation (Thousand households)'] = renovation / 10 ** 3 / step
output['Renovation obligation (Thousand households)'] = 0
if self._flow_obligation:
temp = pd.Series(self._flow_obligation)
temp.index = temp.index.map(lambda x: 'Renovation {} (Thousand households)'.format(x))
output.update(temp.T / 10 ** 3)
output['Renovation obligation (Thousand households)'] = temp.sum() / 10**3
output['Renovation endogenous (Thousand households)'] = output['Renovation (Thousand households)'] - output['Renovation obligation (Thousand households)']
if True in self._replaced_by.index.get_level_values('Heater replacement'):
temp = self._replaced_by.xs(True, level='Heater replacement').sum().sum()
output['Renovation with heater replacement (Thousand households)'] = temp / 10 ** 3 / step
output['Switch heater only (Thousand households)'] = self._only_heater.sum() / 10 ** 3 / step
temp = {i: 0 for i in range(1, 6)}
for n, g in NB_MEASURES.items():
if False in self._replaced_by.index.get_level_values('Heater replacement'):
temp[n] += self._replaced_by.loc[:, g].xs(False, level='Heater replacement').sum().sum()
temp[n + 1] += self._replaced_by.loc[:, g].xs(True, level='Heater replacement').sum().sum()
if self._only_heater is not None:
temp[1] += self._only_heater.sum()
nb_measures = Series(temp)
output['Replacement total (Thousand)'] = (nb_measures * nb_measures.index).sum() / 10 ** 3 / step
output['Replacement total (Thousand renovating)'] = output['Replacement total (Thousand)'] - output[
'Switch heater only (Thousand households)']
nb_measures.index = nb_measures.index.map(lambda x: 'Retrofit measures {} (Thousand households)'.format(x))
output.update(nb_measures.T / 10 ** 3)
heater_no_carbon = ['Electricity-Heat pump water', 'Electricity-Heat pump air',
'Wood fuel-Performance boiler', 'Wood fuel-Standard boiler']
def condition_decarbonizing(x):
return x.index.get_level_values('Heating system final').isin(heater_no_carbon) & ~x.index.get_level_values('Heating system').isin(heater_no_carbon)
output['Switch decarbonize (Thousand households)'] = self._only_heater[condition_decarbonizing(self._only_heater)].sum() / 10**3
temp = self._replaced_by.sum(axis=1)
condition = condition_decarbonizing(temp)
output['Insulation (Thousand households)'] = temp[~condition].sum() / 10**3
output['Insulation and switch decarbonize (Thousand households)'] = temp[condition].sum() / 10**3
output['Decarbonize measures (Thousand households)'] = output['Switch decarbonize (Thousand households)'] + output['Insulation (Thousand households)'] + output['Insulation and switch decarbonize (Thousand households)']
temp = self._replaced_by.groupby(['Housing type', 'Occupancy status']).sum()
temp = (temp.sum(axis=1) / self._stock_ref.groupby(temp.index.names).sum()).dropna()
temp.index = temp.index.map(lambda x: 'Rate {} - {} (%)'.format(x[0], x[1]))
output.update(temp.T)
temp = self._replaced_by.groupby(['Occupancy status', 'Housing type', 'Income owner']).sum()
temp = (temp.sum(axis=1) / self._stock_ref.groupby(temp.index.names).sum()).dropna()
temp = temp.xs('Owner-occupied', level='Occupancy status').xs('Single-family', level='Housing type')
temp.index = temp.index.map(lambda x: 'Rate Single-family - Owner-occupied {} (%)'.format(x))
output.update(temp.T)
_, _, certificate = self.consumption_heating_store(self._stock_ref.index)
temp = concat((self._replaced_by, reindex_mi(certificate.rename('Performance'), self._replaced_by.index)), axis=1)
temp = temp.set_index('Performance', append=True).set_axis(self._replaced_by.columns, axis=1)
s = concat((self._stock_ref, reindex_mi(certificate.rename('Performance'), self._stock_ref.index)), axis=1)
s = s.set_index('Performance', append=True).squeeze()
temp = temp.groupby(['Occupancy status', 'Housing type', 'Performance']).sum()
temp = (temp.sum(axis=1) / s.groupby(temp.index.names).sum()).dropna()
temp = temp.xs('Owner-occupied', level='Occupancy status').xs('Single-family', level='Housing type')
temp.index = temp.index.map(lambda x: 'Rate Single-family - Owner-occupied {} (%)'.format(x))
output.update(temp.T)
epc_upgrade = self._heater_store['epc_upgrade'].stack()
temp = {}
for i in unique(epc_upgrade):
temp.update({i: ((epc_upgrade == i) * self._only_heater).sum()})
self._heater_store['epc_upgrade'] = Series(temp).sort_index()
if self._condition_store is not None:
epc_upgrade_all = self._condition_store['epc_upgrade_all']
temp = {}
for i in unique(epc_upgrade_all.values.ravel('K')):
temp.update({i: ((epc_upgrade_all == i) * self._replaced_by).sum(axis=1)})
epc_upgrade_all = DataFrame(temp).groupby(levels).sum()
temp = epc_upgrade_all.sum().squeeze().sort_index()
temp = temp[temp.index.dropna()]
o = {}
for i in temp.index.union(self._heater_store['epc_upgrade'].index):
if i > 0:
t_renovation = 0
if i in temp.index:
t_renovation = temp.loc[i]
# o['Renovation {} EPC (Thousand households)'.format(i)] = t_renovation / 10 ** 3
t_heater = 0
if i in self._heater_store['epc_upgrade'].index:
t_heater = self._heater_store['epc_upgrade'].loc[i]
o['Retrofit {} EPC (Thousand households)'.format(i)] = (t_renovation + t_heater) / 10 ** 3 / step
o = Series(o).sort_index(ascending=False)
output['Retrofit at least 1 EPC (Thousand households)'] = sum(
[o['Retrofit {} EPC (Thousand households)'.format(i)] for i in temp.index.unique() if i >= 1])
output['Retrofit at least 2 EPC (Thousand households)'] = sum(
[o['Retrofit {} EPC (Thousand households)'.format(i)] for i in temp.index.unique() if i >= 2])
output.update(o.T)
for condition in [c for c in self._condition_store.keys() if c not in self._list_condition_subsidies]:
temp = (self._replaced_by * self._condition_store[condition]).sum().sum()
output[
'{} (Thousand households)'.format(condition.capitalize().replace('_', ' '))] = temp / 10 ** 3 / step
# switch heater
temp = self._heater_store['replacement'].sum()
output['Switch heater (Thousand households)'] = temp.sum() / 10 ** 3 / step
t = self._heater_store['flow_premature_replacement']
if isinstance(t, Series):
t = t.sum()
output['Switch premature heater (Thousand households)'] = t / 10 ** 3 / step
output['Switch Heat pump (Thousand households)'] = temp[[i for i in self._resources_data['index'][
'Heat pumps'] if i in temp.index]].sum() / 10 ** 3 / step
temp.index = temp.index.map(lambda x: 'Switch {} (Thousand households)'.format(x))
output.update((temp / 10 ** 3 / step).T)
# capture heating system transition
temp = self._heater_store['replacement'].stack('Heating system final').squeeze()
temp = temp.reset_index(['Heating system', 'Heating system final'])
temp['Heating system'] = temp['Heating system'].replace(self._resources_data['heating2heater'])
temp['Heating system final'] = temp['Heating system final'].replace(self._resources_data['heating2heater'])
temp = temp.set_index(['Heating system', 'Heating system final'], append=True).squeeze()
temp = temp.groupby(['Heating system', 'Heating system final']).sum()
temp = temp[temp > 0]
temp.index = temp.index.map(lambda x: 'Switch from {} to {} (Thousand households)'.format(x[0], x[1]))
output.update((temp / 10 ** 3 / step).T)
# insulation - who is renovating ?
temp = replaced_by_grouped.sum(axis=1)
t = temp.groupby(['Housing type']).sum()
t.index = t.index.map(lambda x: 'Renovation {} (Thousand households)'.format(x))
output.update((t / 10 ** 3 / step).T)
t = temp.groupby(['Housing type', 'Occupancy status']).sum()
t.index = t.index.map(lambda x: 'Renovation {} - {} (Thousand households)'.format(x[0], x[1]))
output.update((t / 10 ** 3 / step).T)
# insulation - what is renovated ?
o = {}
for i in ['Wall', 'Floor', 'Roof', 'Windows']:
temp = replaced_by_grouped.xs(True, level=i, axis=1).sum(axis=1)
o['Replacement {} (Thousand households)'.format(i)] = temp.sum() / 10 ** 3 / step
t = temp * reindex_mi(self._surface * self._ratio_surface.loc[:, i], temp.index)
o['Replacement {} (Million m2)'.format(i)] = t.sum() / 10 ** 6 / step
cost = self._renovation_store['cost_component'].loc[:, i]
t = reindex_mi(cost, temp.index) * temp
surface = reindex_mi(inputs['surface'].loc[:, self.year], t.index)
o['Investment {} (Billion euro)'.format(i)] = (t * surface).sum() / 10 ** 9 / step
surface = reindex_mi(inputs['surface'].loc[:, self.year], temp.index)
o['Embodied energy {} (TWh PE)'.format(i)] = (temp * surface *
inputs['embodied_energy_renovation'][
i]).sum() / 10 ** 9 / step
o['Carbon footprint {} (MtCO2)'.format(i)] = (temp * surface *
inputs['carbon_footprint_renovation'][
i]).sum() / 10 ** 9 / step
output['Replacement insulation (Thousand)'] = sum(
[o['Replacement {} (Thousand households)'.format(i)] for i in ['Wall', 'Floor', 'Roof', 'Windows']])
output['Replacement insulation average (/household)'] = output['Replacement insulation (Thousand)'] / \
output['Renovation (Thousand households)']
o = Series(o).sort_index(ascending=False)
output.update(o.T)
# economic output global
temp = self._heater_store['cost'].sum()
output['Investment heater (Billion euro)'] = temp.sum() / 10 ** 9 / step
temp.index = temp.index.map(lambda x: 'Investment {} (Billion euro)'.format(x))
output.update(temp.T / 10 ** 9 / step)
investment_heater = self._heater_store['cost'].sum(axis=1)
temp = self._heater_store['hidden_cost_agg'].sum() / self._heater_store['replacement'].sum()
temp.index = temp.index.map(lambda x: 'Hidden cost average {} (euro)'.format(x))
output.update(temp.T)
output['Hidden cost heater (Billion euro)'] = self._heater_store['hidden_cost_agg'].sum().sum() / 10 ** 9 / step
temp = self._heater_store['hidden_cost_agg'].groupby('Housing type').sum() / 10 ** 9 / step
temp = temp.stack().squeeze()
temp.index = temp.index.map(lambda x: 'Hidden cost {} - {} (Billion euro)'.format(x[0], x[1]))
output.update(temp.T)
output['Financing heater (Billion euro)'] = self._heater_store[
'cost_financing'].sum().sum() / 10 ** 9 / step
investment_insulation_full = (self._replaced_by * self._renovation_store['cost_households']).groupby(levels).sum()
investment_insulation = investment_insulation_full.sum(axis=1)
output['Investment insulation (Billion euro)'] = investment_insulation.sum() / 10 ** 9 / step
temp = (self._replaced_by * self._renovation_store['cost_financing_households']).sum().sum()
output['Financing insulation (Billion euro)'] = temp.sum().sum() / 10 ** 9 / step
annuities = calculate_annuities(investment_insulation_full, lifetime=lifetime_insulation, discount_rate=social_discount_rate)
output['Annuities insulation (Billion euro/year)'] = annuities.sum().sum() / 10 ** 9 / step
output['Efficiency insulation (euro/kWh standard)'] = output['Annuities insulation (Billion euro/year)'] / \
output['Consumption standard saving insulation (TWh/year)']
output['Efficiency insulation (euro/kWh)'] = output['Annuities insulation (Billion euro/year)'] / output[
'Consumption saving insulation (TWh/year)']
output['Efficiency insulation (euro/tCO2 standard)'] = output['Annuities insulation (Billion euro/year)'] * 10 ** 3 / \
output['Emission standard saving insulation (MtCO2/year)']
annuities_heater = calculate_annuities(investment_heater, lifetime=20, discount_rate=social_discount_rate)
output['Annuities heater (Billion euro/year)'] = annuities_heater.sum().sum() / 10 ** 9 / step
output['Efficiency heater (euro/kWh standard)'] = output['Annuities heater (Billion euro/year)'] / output[
'Consumption standard saving heater (TWh/year)']
output['Efficiency heater (euro/kWh)'] = output['Annuities heater (Billion euro/year)'] / output[
'Consumption saving heater (TWh/year)']
index = investment_heater.index.union(investment_insulation.index)
investment_total = investment_heater.reindex(index, fill_value=0) + investment_insulation.reindex(index,
fill_value=0)
output['Investment total (Billion euro)'] = investment_total.sum() / 10 ** 9 / step
output['Financing total (Billion euro)'] = output['Financing insulation (Billion euro)'] + output[
'Financing heater (Billion euro)']
# hidden cost
names = self._renovation_store['hidden_cost_agg'].columns.names
temp = self._replaced_by.sum()
temp = temp.reindex(self._renovation_store['hidden_cost_agg'].columns)
temp.iloc[0] = self._renovation_store['no_renovation']
temp = self._renovation_store['hidden_cost_agg'].sum() / temp
temp.index = temp.index.map(lambda row: ", ".join([name for name, value in zip(names, row) if value]))
temp.index = temp.index.map(lambda x: 'Hidden cost average {} (euro)'.format(x))
output.update(temp.T)
"""temp = self._renovation_store['hidden_cost_agg'] / self._replaced_by
temp = temp / reindex_mi(self._renovation_store['cost_households'], self._replaced_by.index)"""
hidden_cost = (self._renovation_store['hidden_cost_agg']).groupby(levels).sum()
output['Hidden cost insulation (Billion euro)'] = hidden_cost.sum().sum() / 10 ** 9 / step
# financing - how households finance renovation - state / debt / saving ?
subsidies_insulation = (self._replaced_by * self._renovation_store['subsidies_households']).groupby(levels).sum()
to_pay = investment_insulation_full - subsidies_insulation
# cost by income levels
temp = self.add_level(to_pay, self._stock_ref, 'Income tenant').sum(axis=1)
temp = temp.groupby('Income tenant').sum()
temp.index = temp.index.map(lambda x: 'To pay insulation {} (Billion euro/year)'.format(x))
output.update(temp.T / 10 ** 9 / step)
annuities_insulation_hh = calculate_annuities(to_pay, lifetime=duration_financing, discount_rate=discount_financing)
output['Annuities insulation households (Billion euro/year)'] = annuities_insulation_hh.sum().sum() / 10 ** 9 / step
annuities_cost_insulation_hh = calculate_annuities(investment_insulation_full, lifetime=duration_financing,
discount_rate=discount_financing)
del investment_insulation_full
annuities_subsidies_insulation_hh = calculate_annuities(subsidies_insulation, lifetime=duration_financing,
discount_rate=discount_financing)
subsidies_insulation = subsidies_insulation.sum(axis=1)
output['Subsidies insulation (Billion euro)'] = subsidies_insulation.sum() / 10 ** 9 / step
if output['Subsidies insulation (Billion euro)'] != 0:
output['Lever insulation (%)'] = output['Investment insulation (Billion euro)'] / output[
'Subsidies insulation (Billion euro)']
annuities_sub = calculate_annuities(output['Subsidies insulation (Billion euro)'], lifetime=lifetime_insulation, discount_rate=social_discount_rate)
output['Efficiency subsidies insulation (euro/kWh standard)'] = annuities_sub / output['Consumption standard saving insulation (TWh/year)']
subsidies_loan_insulation = (self._replaced_by * self._renovation_store['subsidies_loan_households']).groupby(levels).sum()
subsidies_loan_insulation = subsidies_loan_insulation.sum(axis=1)
output['Subsidies loan insulation (Billion euro)'] = subsidies_loan_insulation.sum() / 10 ** 9 / step
debt = (self._replaced_by * self._renovation_store['debt_households']).groupby(levels).sum().sum(
axis=1).groupby(levels_owner).sum()
del self._renovation_store['debt_households']
output['Debt insulation (Billion euro)'] = debt.sum() / 10 ** 9 / step
saving = (self._replaced_by * self._renovation_store['saving_households']).groupby(
levels).sum().sum(axis=1).groupby(levels_owner).sum()
del self._renovation_store['saving_households']
output['Saving insulation (Billion euro)'] = saving.sum() / 10 ** 9 / step
# financing - how households finance new heating system
subsidies_heater = self._heater_store['subsidies'].sum(axis=1)
to_pay = investment_heater - subsidies_heater
temp = self.add_level(to_pay, self._stock_ref, 'Income tenant')
temp = temp.groupby('Income tenant').sum()
temp.index = temp.index.map(lambda x: 'To pay heater {} (Billion euro/year)'.format(x))
output.update(temp.T / 10 ** 9 / step)
temp = [output['To pay heater {} (Billion euro/year)'.format(i)] + output['To pay insulation {} (Billion euro/year)'.format(i)] +
output['Energy expenditures {} (Billion euro)'.format(i)] for i in self._resources_data['index']['Income tenant']]
temp = Series(temp, index=self._resources_data['index']['Income tenant'])
t = self.stock.groupby('Income tenant').sum() / 1e6
t = temp / t
temp.index = temp.index.map(lambda x: 'Total expenditure {} (Billion euro)'.format(x))
t.index = t.index.map(lambda x: 'Total expenditure {} (Thousand euro/hh)'.format(x))
output.update(temp.T / step)
output.update(t.T)
# subsidies_heater_hh = calculate_annuities(subsidies_heater, lifetime=10, discount_rate=0.05)
annuities_heater_hh = calculate_annuities(to_pay, lifetime=duration_financing, discount_rate=discount_financing)
output['Annuities heater households (Billion euro/year)'] = annuities_heater_hh.sum().sum() / 10 ** 9 / step
annuities_cost_heater_hh = calculate_annuities(investment_heater, lifetime=duration_financing,
discount_rate=discount_financing)
annuities_subsidies_heater_hh = calculate_annuities(subsidies_heater, lifetime=duration_financing,
discount_rate=discount_financing)
output['Subsidies heater (Billion euro)'] = subsidies_heater.sum() / 10 ** 9 / step
subsidies_loan_heater = self._heater_store['subsidies_loan'].sum(axis=1)
output['Subsidies loan heater (Billion euro)'] = subsidies_loan_heater.sum() / 10 ** 9 / step
temp = self._heater_store['debt'].loc[self._resources_data['index']['Income owner']]
output['Debt heater (Billion euro)'] = temp.sum() / 10 ** 9 / step
temp = self._heater_store['saving'].loc[self._resources_data['index']['Income owner']]
output['Saving heater (Billion euro)'] = temp.sum() / 10 ** 9 / step
index = subsidies_heater.index.union(subsidies_insulation.index)
subsidies_total = subsidies_heater.reindex(index, fill_value=0) + subsidies_insulation.reindex(index, fill_value=0)
output['Subsidies total (Billion euro)'] = subsidies_total.sum() / 10 ** 9 / step
index = subsidies_loan_heater.index.union(subsidies_loan_insulation.index)
subsidies_loan_total = subsidies_loan_heater.reindex(index, fill_value=0) + subsidies_loan_insulation.reindex(index, fill_value=0)
output['Subsidies loan total (Billion euro)'] = subsidies_loan_total.sum() / 10 ** 9 / step
output['Debt total (Billion euro)'] = output['Debt heater (Billion euro)'] + output[
'Debt insulation (Billion euro)']
output['Saving total (Billion euro)'] = output['Saving heater (Billion euro)'] + output[
'Saving insulation (Billion euro)']
# macro-average
output['Investment total (Thousand euro/household)'] = output['Investment total (Billion euro)'] * 10 ** 3 / \
output['Retrofit (Thousand households)']
output['Investment insulation (Thousand euro/household)'] = 0
if output['Renovation (Thousand households)'] != 0:
output['Investment insulation (Thousand euro/household)'] = output[
'Investment insulation (Billion euro)'] * 10 ** 3 / \
output['Renovation (Thousand households)']
output['Subsidies insulation (Thousand euro/household)'] = output[
'Subsidies insulation (Billion euro)'] * 10 ** 3 / \
output['Renovation (Thousand households)']
output['Saving insulation (Thousand euro/household)'] = output[
'Saving insulation (Billion euro)'] * 10 ** 3 / \
output['Renovation (Thousand households)']
output['Debt insulation (Thousand euro/household)'] = output[
'Debt insulation (Billion euro)'] * 10 ** 3 / \
output['Renovation (Thousand households)']
# subsidies - description
temp = subsidies_total.groupby('Income owner').sum() / investment_total.groupby('Income owner').sum()
temp = temp.loc[self._resources_data['index']['Income owner']]
temp.index = temp.index.map(lambda x: 'Share subsidies {} (%)'.format(x))
output.update(temp.T)
levels_owner = debt.index.names
if self._replaced_by.sum().sum().round(0) > 0:
lvls = ['Housing type', 'Occupancy status', 'Income tenant']
# annuities include financing cost
annuities_insulation_rented = annuities_insulation_hh[annuities_insulation_hh.index.get_level_values('Occupancy status').isin(['Privately rented', 'Social-housing'])]
annuities_insulation_rented = annuities_insulation_rented.groupby(['Housing type', 'Income owner']).sum().sum(axis=1)
annuities_insulation_year = self.add_level(annuities_insulation_hh, self._stock_ref, 'Income tenant')
annuities_insulation_year = annuities_insulation_year.groupby(lvls).sum().sum(axis=1)
temp = annuities_insulation_year.copy()
temp.index = temp.index.map(lambda x: 'Annuities insulation {} - {} - {}'.format(x[0], x[1], x[2]))
output.update(temp.T)
temp = self.add_level(annuities_cost_insulation_hh, self._stock_ref, 'Income tenant')
temp = temp.groupby(lvls).sum().sum(axis=1)
temp.index = temp.index.map(lambda x: 'Annuities cost insulation {} - {} - {}'.format(x[0], x[1], x[2]))
output.update(temp.T)
temp = self.add_level(annuities_subsidies_insulation_hh, self._stock_ref, 'Income tenant')
temp = temp.groupby(lvls).sum().sum(axis=1)
temp.index = temp.index.map(lambda x: 'Annuities subsidies insulation {} - {} - {}'.format(x[0], x[1], x[2]))
output.update(temp.T)
annuities_heater_rented = annuities_heater_hh[annuities_heater_hh.index.get_level_values('Occupancy status').isin(['Privately rented', 'Social-housing'])]
annuities_heater_rented = annuities_heater_rented.groupby(['Housing type', 'Income owner']).sum()
annuities_heater_year = self.add_level(annuities_heater_hh, self._stock_ref, 'Income tenant')
annuities_heater_year = annuities_heater_year.groupby(lvls).sum()
temp = annuities_heater_year.copy()
temp.index = temp.index.map(lambda x: 'Annuities heater {} - {} - {}'.format(x[0], x[1], x[2]))
output.update(temp.T)
temp = self.add_level(annuities_cost_heater_hh, self._stock_ref, 'Income tenant')
temp = temp.groupby(lvls).sum()
temp.index = temp.index.map(lambda x: 'Annuities cost heater {} - {} - {}'.format(x[0], x[1], x[2]))
output.update(temp.T)
temp = self.add_level(annuities_subsidies_heater_hh, self._stock_ref, 'Income tenant')
temp = temp.groupby(lvls).sum()
temp.index = temp.index.map(lambda x: 'Annuities subsidies heater {} - {} - {}'.format(x[0], x[1], x[2]))
output.update(temp.T)
annuities_year = annuities_insulation_year + annuities_heater_year
annuities_rented_year = annuities_insulation_rented + annuities_heater_rented
# annuities_rented_year add Landlord as occupancy status to account for cost
index_arrays = [
annuities_rented_year.index.get_level_values(0), # Housing type
annuities_rented_year.index.get_level_values(1), # Income owner
['Landlord'] * len(annuities_rented_year) # Occupancy status
]
new_index = pd.MultiIndex.from_arrays(index_arrays,
names=['Housing type', 'Income tenant', 'Occupancy status'])
annuities_rented_year.index = new_index
annuities_rented_year = annuities_rented_year.reorder_levels(['Housing type', 'Occupancy status', 'Income tenant'])
annuities_year = concat((annuities_year, annuities_rented_year), axis=0)
temp = annuities_year.copy()
temp.index = temp.index.map(lambda x: 'Annuities {} - {} - {}'.format(x[0], x[1], x[2]))
output.update(temp.T)
"""duration = 10
years = [y for y in self.expenditure_store.keys() if y > self.year - duration]
annuities_cumulated = sum([self.expenditure_store[y]['annuities'] for y in years])
annuities_cumulated += annuities_year
"""
consumption_std = reindex_mi(self.consumption_heating(full_output=False), self.stock.index)
consumption_std *= reindex_mi(self._surface, self.stock.index)
energy_exp_std = self.energy_bill(prices, consumption_std, bill_rebate=0)
energy_exp_std *= self.stock
energy_exp_std = energy_exp_std.groupby(lvls).sum()
temp = energy_exp_std.copy()
temp.index = temp.index.map(
lambda x: 'Energy expenditures standard {} - {} - {} (euro)'.format(x[0], x[1], x[2]))
output.update(temp.T)
consumption = self.consumption_actual(prices, bill_rebate=bill_rebate)
energy_exp = self.energy_bill(prices, consumption, bill_rebate=bill_rebate)
energy_exp *= self.stock
energy_exp = energy_exp.groupby(lvls).sum()
temp = energy_exp.copy()
temp.index = temp.index.map(
lambda x: 'Energy expenditures {} - {} - {} (euro)'.format(x[0], x[1], x[2]))
output.update(temp.T)
consumption *= self.stock
temp = consumption.groupby(lvls).sum().copy()
temp.index = temp.index.map(
lambda x: 'Consumption {} - {} - {} (kWh)'.format(x[0], x[1], x[2]))
output.update(temp.T)
consumption = self.consumption_actual(prices, bill_rebate=bill_rebate)
consumption *= self.stock
emission = self.to_emission(consumption, inputs['carbon_emission'].loc[self.year, :])
emission = emission.groupby(lvls).sum()
temp = emission.copy()
temp.index = temp.index.map(
lambda x: 'Emission {} - {} - {} (gCO2)'.format(x[0], x[1], x[2]))
output.update(temp.T)
s = self.stock.groupby(lvls).sum()
temp = s.copy()
temp.index = temp.index.map(lambda x: 'Stock {} - {} - {}'.format(x[0], x[1], x[2]))
output.update(temp.T)
i = (reindex_mi(self._income_tenant, s.index) * s)
temp = i.copy()
temp.index = temp.index.map(lambda x: 'Income {} - {} - {} (euro)'.format(x[0], x[1], x[2]))
output.update(temp.T)
# ------------------------------------------------------------------------------------------------------
# following is not used most of it are just for testing
owner = replaced_by_grouped.sum(axis=1).groupby(levels_owner).sum()
temp = annuities_insulation_hh.sum(axis=1).groupby(levels_owner).sum() / owner
temp.index = temp.index.map(
lambda x: 'Annuities insulation {} - {} (euro/year.household)'.format(x[0], x[1]))
output.update(temp.T)
temp = debt / owner
temp.index = temp.index.map(lambda x: 'Debt insulation {} - {} (euro/household)'.format(x[0], x[1]))
output.update(temp.T)
temp = saving / owner
temp.index = temp.index.map(lambda x: 'Saving insulation {} - {} (euro/household)'.format(x[0], x[1]))
output.update(temp.T)
# ------------------------------------------------------------------------------------------------------
# economic private impact - distributive indicator - are investment profitable ?
if False:
temp = annuities_insulation_hh.sum(axis=1).xs('Privately rented', level='Occupancy status', drop_level=False)
temp = self.add_level(temp, self._stock_ref, 'Income tenant')
owner_tenant = replaced_by_grouped.sum(axis=1)
owner_tenant = self.add_level(owner_tenant, self._stock_ref, 'Income tenant')
if 'Privately rented' in owner_tenant.index.get_level_values('Occupancy status'):
owner_private = owner_tenant.xs('Privately rented', level='Occupancy status', drop_level=False)
temp = temp.groupby('Income tenant').sum() / owner_private.groupby('Income tenant').sum()
temp = temp.loc[self._resources_data['index']['Income tenant']]
temp.index = temp.index.map(lambda x: 'Rent {} (euro/year.household)'.format(x))
output.update(temp.T)
temp = consumption_saved_insulation.sum(axis=1)
del consumption_saved_insulation
temp = self.add_level(temp, self._stock_ref, 'Income tenant')
bill_saving = self.to_emission(temp, prices)
temp = bill_saving.groupby(['Occupancy status', 'Income tenant']).sum() / owner_tenant.groupby(
['Occupancy status', 'Income tenant']).sum()
temp.index = temp.index.map(
lambda x: 'Bill saving standard {} - {} (euro/year.household)'.format(x[0], x[1]))
output.update(temp.T)
temp = consumption_saved_actual_insulation.sum(axis=1)
del consumption_saved_actual_insulation
bill_saving = self.to_emission(temp, prices)
temp = bill_saving.groupby(['Occupancy status', 'Income tenant']).sum() / owner_tenant.groupby(
['Occupancy status', 'Income tenant']).sum()
temp.index = temp.index.map(lambda x: 'Bill saving {} - {} (euro/year.household)'.format(x[0], x[1]))
output.update(temp.T)
# private balance
temp = [- output['Annuities insulation Owner-occupied - {} (euro/year.household)'.format(i)] + output[
'Bill saving Owner-occupied - {} (euro/year.household)'.format(i)] for i in
self._resources_data['index']['Income owner']]
temp = Series(temp, index=self._resources_data['index']['Income owner'])
temp.index = temp.index.map(lambda x: 'Balance Owner-occupied - {} (euro/year.household)'.format(x))
output.update(temp.T)
temp = [- output['Rent {} (euro/year.household)'.format(i)] + output[
'Bill saving Privately rented - {} (euro/year.household)'.format(i)] for i in
self._resources_data['index']['Income tenant']]
temp = Series(temp, index=self._resources_data['index']['Income tenant'])
temp.index = temp.index.map(lambda x: 'Balance Tenant private - {} (euro/year.household)'.format(x))
output.update(temp.T)
# private balance standard
temp = [- output['Annuities insulation Owner-occupied - {} (euro/year.household)'.format(i)] + output[
'Bill saving standard Owner-occupied - {} (euro/year.household)'.format(i)] for i in
self._resources_data['index']['Income owner']]
temp = Series(temp, index=self._resources_data['index']['Income owner'])
temp.index = temp.index.map(
lambda x: 'Balance standard Owner-occupied - {} (euro/year.household)'.format(x))
output.update(temp.T)
temp = [- output['Rent {} (euro/year.household)'.format(i)] + output[
'Bill saving standard Privately rented - {} (euro/year.household)'.format(i)] for i in
self._resources_data['index']['Income tenant']]
temp = Series(temp, index=self._resources_data['index']['Income tenant'])
temp.index = temp.index.map(
lambda x: 'Balance standard Tenant private - {} (euro/year.household)'.format(x))
output.update(temp.T)
# economic state impact
output['VAT heater (Billion euro)'] = self._heater_store['vat'] / 10 ** 9 / step
temp = (self._replaced_by * self._renovation_store['vat_households']).sum().sum()
output['VAT insulation (Billion euro)'] = temp / 10 ** 9 / step
output['VAT (Billion euro)'] = output['VAT heater (Billion euro)'] + output['VAT insulation (Billion euro)']
output['Investment heater WT (Billion euro)'] = output['Investment heater (Billion euro)'] - output[
'VAT heater (Billion euro)']
output['Investment insulation WT (Billion euro)'] = output['Investment insulation (Billion euro)'] - output[
'VAT insulation (Billion euro)']
output['Investment total WT (Billion euro)'] = output['Investment total (Billion euro)'] - output[
'VAT (Billion euro)']
output['Investment total WT / households (Thousand euro)'] = output['Investment total WT (Billion euro)'] * 10 ** 6 / (
output['Retrofit (Thousand households)'] * 10 ** 3)
amount = output['Subsidies total (Billion euro)'] + output['Subsidies loan total (Billion euro)']
temp = calculate_annuities(amount, lifetime=duration_financing, discount_rate=0.01) * duration_financing - amount
output['Financing all (Billion euro)'] = temp + output['Financing total (Billion euro)']
# co-benefit
if 'Embodied energy Wall (TWh PE)' in output.keys():
output['Embodied energy renovation (TWh PE)'] = output['Embodied energy Wall (TWh PE)'] + output[
'Embodied energy Floor (TWh PE)'] + output['Embodied energy Roof (TWh PE)'] + output[
'Embodied energy Windows (TWh PE)']
output['Embodied energy construction (TWh PE)'] = inputs['Embodied energy construction (TWh PE)'].loc[
self.year]
output['Embodied energy (TWh PE)'] = output['Embodied energy renovation (TWh PE)'] + output[
'Embodied energy construction (TWh PE)']
output['Carbon footprint renovation (MtCO2)'] = output['Carbon footprint Wall (MtCO2)'] + output[
'Carbon footprint Floor (MtCO2)'] + output['Carbon footprint Roof (MtCO2)'] + output[
'Carbon footprint Windows (MtCO2)']
output['Carbon footprint construction (MtCO2)'] = inputs['Carbon footprint construction (MtCO2)'].loc[
self.year]
output['Carbon footprint (MtCO2)'] = output['Carbon footprint renovation (MtCO2)'] + output[
'Carbon footprint construction (MtCO2)']
output['Carbon value indirect renovation (Billion euro)'] = output['Carbon footprint renovation (MtCO2)'] * carbon_value / 10 ** 3 / step
output['Carbon value indirect (Billion euro)'] = output['Carbon footprint (MtCO2)'] * carbon_value / 10 ** 3 / step
output['Bill rebate (euro/(household.year))'] = bill_rebate
consumption = self.consumption_actual(prices, bill_rebate=bill_rebate)
temp = (self.energy_bill(prices, consumption, bill_rebate=0) - self.energy_bill(prices, consumption, bill_rebate=bill_rebate))
temp *= self.stock
output['Bill rebate (Billion euro)'] = temp.sum() / 1e9
output['Income state (Billion euro)'] = output['VAT (Billion euro)'] + output['Taxes expenditure (Billion euro)']
output['Expenditure state (Billion euro)'] = output['Subsidies total (Billion euro)'] + output['Subsidies loan total (Billion euro)'] + output['Health expenditure (Billion euro)'] + output['Bill rebate (Billion euro)']
output['Balance state (Billion euro)'] = output['Income state (Billion euro)'] - output['Expenditure state (Billion euro)']
if self._balance_state_ini is None:
self._balance_state_ini = output['Balance state (Billion euro)']
# subsidies - details: policies amount and number of beneficiaries
subsidies_details_renovation, replacement_eligible_renovation, subsidies_average_renovation, cost_average_renovation = {}, {}, {}, {}
energy_saved_renovation = {}
for key, sub in self._renovation_store['subsidies_details_households'].items():
subsidies_details_renovation[key] = (
self._replaced_by * reindex_mi(sub, self._replaced_by.index)).groupby(levels).sum()
if key in self._renovation_store['eligible'].keys():
eligible = self._renovation_store['eligible'][key]
else:
eligible = sub.copy()
eligible[eligible > 0] = 1
eligible = eligible.any(axis=1)
replacement_eligible = self._replaced_by.fillna(0).sum(axis=1) * eligible
replacement_eligible_renovation[key] = replacement_eligible.groupby('Housing type').sum()
if eligible.sum().sum() == 0:
subsidies_average_renovation[key] = 0
cost_average_renovation[key] = 0
else:
subsidies_average_renovation[key] = sub.sum().sum() / replacement_eligible.sum()
cost = reindex_mi((self._renovation_store['cost_households'] - self._renovation_store['vat_households']), replacement_eligible.index)
cost = ((cost * self._replaced_by.fillna(0)).T * eligible).T
cost_average_renovation[key] = cost.sum().sum() / replacement_eligible.sum()
energy_saved = reindex_mi(self._renovation_store['consumption_saved_households'], replacement_eligible.index)
energy_saved = ((energy_saved * self._replaced_by.fillna(0)).T * eligible).T
energy_saved_renovation[key] = energy_saved.sum().sum() / replacement_eligible.sum()
del self._renovation_store['subsidies_details_households']
gc.collect()
subsidies, replacement_eligible, sub_count, cost_average, energy_average = None, None, None, None, None
for gest, subsidies_details in {'heater': self._heater_store['subsidies_details'],
'insulation': subsidies_details_renovation}.items():
if gest == 'heater':
sub_count = DataFrame(self._heater_store['replacement_eligible'], dtype=float)
cost_average = Series(self._heater_store['cost_average'], dtype=float)
elif gest == 'insulation':
sub_count = DataFrame(replacement_eligible_renovation, dtype=float)
cost_average = Series(cost_average_renovation, dtype=float)
energy_average = Series(energy_saved_renovation, dtype=float)
subsidies_details = Series({k: i.sum().sum() for k, i in subsidies_details.items()}, dtype='float64')
for i in subsidies_details.index:
# use subsidies in post-treatment
if '{} {}'.format(i, gest) in inputs['use_subsidies'].index:
use_subsidies = inputs['use_subsidies'].loc['{} {}'.format(i, gest)]
subsidies_details[i] *= use_subsidies
sub_count[i] *= use_subsidies
temp = sub_count[i].copy()
temp.index = temp.index.map(
lambda x: '{} {} {} (Thousand households)'.format(i.capitalize().replace('_', ' '), gest, x))
output.update(temp.T / 10 ** 3 / step)
output.update({'{} {} (Thousand households)'.format(i.capitalize().replace('_', ' '), gest):
sub_count[i].sum() / 1e3 / step})
output['Average cost {} {} (euro)'.format(i.capitalize().replace('_', ' '), gest)] = cost_average.loc[i]
if gest == 'insulation' and i in energy_average.keys():
output['Average energy std saved {} {} (MWh)'.format(i.capitalize().replace('_', ' '), gest)] = \
energy_average.loc[i] / 1e3
output['{} {} (Billion euro)'.format(i.capitalize().replace('_', ' '), gest)] = \
subsidies_details.loc[i] / 10 ** 9 / step
if subsidies is None:
subsidies = subsidies_details.copy()
replacement_eligible = sub_count.copy()
cost = (cost_average * sub_count.sum()).copy()
else:
subsidies = concat((subsidies, subsidies_details), axis=0)
replacement_eligible = concat((replacement_eligible, sub_count), axis=0)
cost = concat((cost, cost_average * sub_count.sum()), axis=0)
if subsidies is not None:
subsidies = subsidies.groupby(subsidies.index).sum()
replacement_eligible = replacement_eligible.groupby(replacement_eligible.index).sum()
cost = cost.groupby(cost.index).sum() / replacement_eligible.sum()
for i in subsidies.index:
temp = replacement_eligible[i]
output['{} (Thousand households)'.format(
i.capitalize().replace('_', ' '))] = temp.sum() / 10 ** 3 / step
temp.index = temp.index.map(
lambda x: '{} {} (Thousand households)'.format(i.capitalize().replace('_', ' '), x))
output.update(temp.T / 10 ** 3)
output['{} (Billion euro)'.format(i.capitalize().replace('_', ' '))] = subsidies.loc[i] / 10 ** 9 / step
output['Average cost {} (euro)'.format(i.capitalize().replace('_', ' '))] = cost.loc[i]
t = self._heater_store['replacement'].groupby('Housing type').sum().loc['Multi-family']
t.index = t.index.map(lambda x: 'Switch Multi-family {} (Thousand households)'.format(x))
output.update((t / 10 ** 3 / step).T)
t = self._heater_store['replacement'].groupby('Housing type').sum().loc['Single-family']
t.index = t.index.map(lambda x: 'Switch Single-family {} (Thousand households)'.format(x))
output.update((t / 10 ** 3 / step).T)
temp = investment_total.groupby(['Housing type', 'Occupancy status']).sum()
temp.index = temp.index.map(lambda x: 'Investment total {} - {} (Billion euro)'.format(x[0], x[1]))
output.update(temp.T / 10 ** 9 / step)
temp = subsidies_total.groupby(['Housing type', 'Occupancy status']).sum()
temp.index = temp.index.map(lambda x: 'Subsidies total {} - {} (Million euro)'.format(x[0], x[1]))
output.update(temp.T / 10 ** 6 / step)
temp = subsidies_total.groupby(['Housing type', 'Occupancy status', 'Income owner']).sum()
temp.index = temp.index.map(lambda x: 'Subsidies total {} - {} - {} (Million euro)'.format(x[0], x[1], x[2]))
output.update(temp.T / 10 ** 6 / step)
"""
self.store_over_years[self.year].update(
{'Annuities heater (Billion euro/year)': output['Annuities heater (Billion euro/year)'],
'Annuities insulation (Billion euro/year)': output['Annuities insulation (Billion euro/year)'],
})
years = [y for y in self.store_over_years.keys() if y > self.year - 20 and 'Annuities heater (Billion euro/year)' in self.store_over_years[y].keys()]
annuities_heater_cumulated = sum([self.store_over_years[y]['Annuities heater (Billion euro/year)'] for y in years])
# TODO: Cumulated 20 years ?
years = [y for y in self.store_over_years.keys() if y > self.year - 20 and 'Annuities insulation (Billion euro/year)' in self.store_over_years[y].keys()]
annuities_insulation_cumulated = sum(
[self.store_over_years[y]['Annuities insulation (Billion euro/year)'] for y in years])
"""
# Opportunity cost of public spending
output['COFP (Billion euro)'] = 0
balance = self._balance_state_ini - output['Balance state (Billion euro)']
# if > 0 it's a cost because income net of expenditures decrease compared to initial state
temp = balance * coefficient_public_funding
output['COFP (Billion euro)'] = temp
# running cost
output['Cost energy (Billion euro)'] = output['Energy expenditures wt (Billion euro)']
output['Cost emission (Billion euro)'] = (output['Emission (MtCO2)'] * carbon_value) / 10**3
output['Cost heath (Billion euro)'] = output['Health cost (Billion euro)']
output['Loss thermal comfort (Billion euro)'] = - output['Thermal comfort EE (Billion euro)'] + output['Thermal loss prices (Billion euro)']
"""
output['Cost heater (Billion euro)'] = annuities_heater_cumulated
output['Cost insulation (Billion euro)'] = annuities_insulation_cumulated
variables = ['Cost energy (Billion euro)', 'Cost emission (Billion euro)', 'Cost heath (Billion euro)',
'Loss thermal comfort (Billion euro)', 'Cost heater (Billion euro)',
'Cost insulation (Billion euro)']
output['Running cost (Billion euro)'] = sum([output[i] for i in variables])
"""
consumption_saving_ee = (_consumption_saved_actual_insulation + self._heater_store['consumption_saved_actual'])
output['CBA Consumption saving (TWh)'] = output['Consumption saving (TWh/year)']
output['CBA Consumption saving insulation (TWh)'] = _consumption_saved_actual_insulation.sum() / 10**9
output['CBA Consumption saving switch fuel (TWh)'] = self._heater_store['consumption_saved_actual'].sum() / 10**9
output['CBA Consumption saving EE (TWh)'] = consumption_saving_ee.sum() / 10**9
output['CBA Consumption saving prices (TWh)'] = output['Consumption saving prices effect (TWh/year)']
output['CBA Rebound EE (TWh)'] = output['Rebound EE (TWh/year)']
# cost-benefits analysis compared to previous year
energy_wt_value = calculate_average(inputs['energy_prices_wt'].loc[self.year:], lifetime=lifetime_insulation,
discount_rate=social_discount_rate)
carbon_value_mean = calculate_average(inputs['carbon_value'].loc[self.year:], lifetime=lifetime_insulation,
discount_rate=social_discount_rate)
consumption_saving_ee = (consumption_saving_ee * energy_wt_value).sum() / 10**9
output['CBA Consumption saving EE (Billion euro)'] = consumption_saving_ee
output['CBA Consumption saving prices (Billion euro)'] = output['Consumption saving prices (Billion euro)']
output['CBA Thermal comfort EE (Billion euro)'] = output['Thermal comfort EE (Billion euro)']
output['CBA Emission direct (Billion euro)'] = (output['Emission saving (MtCO2/year)'] * carbon_value_mean) / 10**3
output['CBA Health cost (Billion euro)'] = self.store_over_years[self.year - 1]['Health cost (Billion euro)'] - self.store_over_years[self.year]['Health cost (Billion euro)']
output['CBA benefits (Billion euro)'] = output['CBA Consumption saving EE (Billion euro)'] + \
output['CBA Consumption saving prices (Billion euro)'] + \
output['CBA Thermal comfort EE (Billion euro)'] + \
output['CBA Emission direct (Billion euro)'] + \
output['CBA Health cost (Billion euro)']
# cost
output['CBA Thermal loss prices (Billion euro)'] = - output['Thermal loss prices (Billion euro)']
output['CBA Annuities insulation (Billion euro)'] = - output['Annuities insulation (Billion euro/year)']
output['CBA Annuities heater (Billion euro)'] = - output['Annuities heater (Billion euro/year)']
temp = calculate_annuities(output['Carbon value indirect renovation (Billion euro)'],
lifetime=lifetime_insulation,
discount_rate=social_discount_rate)
output['CBA Carbon Emission indirect (Billion euro)'] = - temp
output['CBA COFP (Billion euro)'] = - output['COFP (Billion euro)']
output['CBA cost (Billion euro)'] = output['CBA Annuities heater (Billion euro)'] + \
output['CBA Annuities insulation (Billion euro)'] + \
output['CBA Carbon Emission indirect (Billion euro)'] + \
output['CBA Thermal loss prices (Billion euro)'] + output['CBA COFP (Billion euro)']
output['Cost-benefits analysis (Billion euro)'] = output['CBA benefits (Billion euro)'] + output['CBA cost (Billion euro)']
output = Series(output).rename(self.year)
stock = stock.rename(self.year)
return stock, output
[docs] def parse_output_run_cba(self, prices, inputs, step=1, taxes=None, bill_rebate=0):
output = dict()
# emission
emission = inputs['carbon_emission'].loc[self.year, :]
consumption_energy = self.consumption_agg(prices=prices, freq='year', climate=None, standard=False, agg='energy',
bill_rebate=bill_rebate)
temp = consumption_energy * emission
output['Emission (MtCO2)'] = temp.sum() / 10 ** 3
# investment
investment_cost = (self._replaced_by * self._renovation_store['cost_households']).sum().sum()
investment_cost = investment_cost.sum() / 10 ** 9 / step
vat = (self._replaced_by * self._renovation_store['vat_households']).sum().sum()
output['VAT insulation (Billion euro)'] = vat / 10 ** 9 / step
output['Investment insulation WT (Billion euro)'] = investment_cost - output['VAT insulation (Billion euro)']
# vat
investment_heater = self._heater_store['cost'].sum().sum() / 10 ** 9 / step
output['VAT heater (Billion euro)'] = self._heater_store['vat'] / 10 ** 9 / step
output['Investment heater WT (Billion euro)'] = investment_heater - output['VAT heater (Billion euro)']
output['Health cost (Billion euro)'] = self.health_cost(inputs['health_cost_dpe'], inputs['health_cost_income'], prices)
output['VAT (Billion euro)'] = output['VAT insulation (Billion euro)'] + output['VAT heater (Billion euro)']
output['Health expenditure (Billion euro)'] = 0 # temp['Health expenditure (Billion euro)']
output['Subsidies heater (Billion euro)'] = self._heater_store['subsidies'].sum().sum() / 10 ** 9 / step
output['Subsidies insulation (Billion euro)'] = (self._replaced_by * self._renovation_store[
'subsidies_households']).sum().sum() / 10 ** 9
output['Subsidies (Billion euro)'] = output['Subsidies heater (Billion euro)'] + output['Subsidies insulation (Billion euro)']
if taxes is not None:
consumption_energy = self.consumption_agg(prices=prices, freq='year', climate=None, standard=False,
agg='energy', bill_rebate=bill_rebate)
taxes_expenditures = dict()
total_taxes = Series(0, index=prices.index)
for tax in taxes:
if self.year in tax.value.index:
if tax.name not in self.taxes_list:
self.taxes_list += [tax.name]
amount = tax.value.loc[self.year, :] * consumption_energy
taxes_expenditures[tax.name] = amount
total_taxes += amount
output['Taxes expenditure (Billion euro)'] = DataFrame(taxes_expenditures).sum().sum() / step
output['Income state (Billion euro)'] = output['VAT (Billion euro)'] + output['Taxes expenditure (Billion euro)']
output['Expenditure state (Billion euro)'] = output['Subsidies (Billion euro)'] + output['Health expenditure (Billion euro)']
output['Balance state (Billion euro)'] = output['Income state (Billion euro)'] - output['Expenditure state (Billion euro)']
if False:
levels = [i for i in self._replaced_by.index.names if
i not in ['Heater replacement', 'Heating system final']]
temp = self.add_level(self._replaced_by.fillna(0), self._stock_ref, 'Income tenant')
consumption_saved_actual_insulation = (
temp * reindex_mi(self._renovation_store['consumption_saved_actual_households'],
temp.index)).groupby(levels + ['Income tenant']).sum()
consumption_saved_no_rebound_insulation = (
temp * reindex_mi(self._renovation_store['consumption_saved_no_rebound_households'],
temp.index)).groupby(levels + ['Income tenant']).sum()
rebound_insulation = (consumption_saved_no_rebound_insulation - consumption_saved_actual_insulation).sum(
axis=1)
rebound_insulation = rebound_insulation.groupby(self.to_energy(rebound_insulation)).sum()
rebound_heater = self._heater_store['rebound']
thermal_comfort = (rebound_insulation + rebound_heater) * prices
output.update({'Thermal comfort EE (Billion euro)': thermal_comfort.sum() / 10 ** 9})
"""_consumption_saved_actual_insulation = self.apply_calibration(
consumption_saved_actual_insulation.sum(axis=1))
consumption_saved_price_constant = _consumption_saved_actual_insulation + self._heater_store[
'consumption_saved_actual']
output['Consumption saving prices constant (TWh/year)'] = consumption_saved_price_constant.sum() / 10 ** 9
output['Consumption saving prices effect (TWh/year)'] = round(
output['Consumption saving (TWh/year)'] - output['Consumption saving prices constant (TWh/year)'], 3)
prices_effect = pd.Series({i: output['Consumption {} saving (TWh/year)'.format(i)] -
consumption_saved_price_constant.loc[i] / 10 ** 9 for i in
consumption_saved_price_constant.index})
thermal_loss = prices_effect * prices
output.update({'Thermal loss prices (Billion euro)': round(thermal_loss.sum(), 3)})"""
output = Series(output).rename(self.year)
return output
[docs] def parse_output_consumption(self, prices, bill_rebate=0):
output = self.consumption_agg(prices=prices, freq='year', climate=None, standard=False, agg='energy',
bill_rebate=bill_rebate)
output.index = output.index.map(lambda x: 'Consumption {} (TWh)'.format(x))
temp = prices.T
temp.index = temp.index.map(lambda x: 'Prices {} (euro/kWh)'.format(x))
output = pd.concat((output, temp), axis=0).rename(self.year)
return output
[docs] def apply_scale(self, scale, gest='insulation'):
def calculate_indicators_insulation():
self.hidden_cost_insulation = - self.constant_insulation_intensive / abs(
self.preferences_insulation['cost']) * 1000
hidden_cost_renovation = - self.constant_insulation_extensive / abs(
self.preferences_insulation['cost']) * 1000
# estimate of friction. First landlord-tenant dilemma, then multi-family friction.
if self.no_friction is False:
hidden_cost = hidden_cost_renovation.loc[('Single-family', 'Owner-occupied')]
temp = hidden_cost_renovation.xs('Owner-occupied', level='Occupancy status', drop_level=True).copy()
landlord_dilemma = hidden_cost_renovation - temp
multi_family_friction = hidden_cost_renovation - (hidden_cost + landlord_dilemma)
# test = hidden_cost + landlord_dilemma + multi_family_friction
market_failures = Series(0, index=hidden_cost_renovation.index)
market_failures += landlord_dilemma.reindex(market_failures.index).fillna(0)
market_failures += multi_family_friction.reindex(market_failures.index).fillna(0)
hidden_cost = hidden_cost_renovation - market_failures
hidden_cost_renovation = concat([hidden_cost] * len(self.hidden_cost_insulation), axis=1, keys=self.hidden_cost_insulation.index)
hidden_cost_renovation = hidden_cost_renovation + self.hidden_cost_insulation
self.hidden_cost_renovation = hidden_cost_renovation
self.hidden_cost = hidden_cost
self.landlord_dilemma = landlord_dilemma
self.multifamily_friction = multi_family_friction
self.constant_insulation = - self.hidden_cost_renovation / 1000 * abs(self.preferences_insulation['cost'])
self.utility_hidden_cost = - self.hidden_cost / 1000 * abs(self.preferences_insulation['cost'])
else:
constant_insulation = concat([self.constant_insulation_extensive] * len(self.constant_insulation_intensive), axis=1, keys=self.constant_insulation_intensive.index)
constant_insulation = constant_insulation + self.constant_insulation_intensive
self.constant_insulation = constant_insulation.copy()
if gest == 'insulation':
self.scale_insulation *= scale
self.preferences_insulation['subsidy'] *= scale
self.preferences_insulation['cost'] *= scale
self.preferences_insulation['bill_saved'] *= scale
calculate_indicators_insulation()
elif gest == 'heater':
self.scale_heater *= scale
self.preferences_heater['subsidy'] *= scale
self.preferences_heater['cost'] *= scale
self.preferences_heater['bill_saved'] *= scale
self.preferences_heater['status_quo_bias'] *= scale
self.hidden_cost_heater = - self.constant_heater / abs(self.preferences_heater['cost']) * 1000
[docs] def calibration_exogenous(self, coefficient_global=None, coefficient_backup=None, constant_heater=None,
scale_heater=None, constant_insulation_intensive=None, constant_insulation_extensive=None,
scale_insulation=None, energy_prices=None, rational_hidden_cost=None,
number_firms_insulation=None, number_firms_heater=None, hi_threshold=None):
"""Function calibrating buildings object with exogenous data.
Parameters
----------
coefficient_global: float
coefficient_backup: Series
constant_heater: Series
scale_heater: float
constant_insulation_intensive: Series
constant_insulation_extensive: Series
scale_insulation: float
energy_prices: Series
Energy prices for year y. Index are energy carriers {'Electricity', 'Natural gas', 'Oil fuel', 'Wood fuel'}.
rational_hidden_cost:
"""
# calibration energy consumption first year
if (coefficient_global is None) and (energy_prices is not None):
self.calibration_consumption(energy_prices.loc[self.first_year, :], None)
else:
self.coefficient_global = coefficient_global
self.coefficient_backup = coefficient_backup
if rational_hidden_cost is not None:
self.rational_hidden_cost = rational_hidden_cost
else:
if constant_heater is not None:
self.constant_heater = constant_heater
if constant_insulation_intensive is not None:
self.constant_insulation_intensive = constant_insulation_intensive.dropna()
if constant_insulation_extensive is not None:
self.constant_insulation_extensive = constant_insulation_extensive.dropna()
if scale_heater is not None:
self.apply_scale(scale_heater, gest='heater')
if scale_insulation is not None:
self.apply_scale(scale_insulation, gest='insulation')
self.hi_threshold = hi_threshold
self.number_firms_insulation, self.number_firms_heater = number_firms_insulation, number_firms_heater
[docs] def remove_calibration(self):
self.coefficient_global = None
self.coefficient_backup = None
self.preferences_insulation['subsidy'] /= self.scale_insulation
self.preferences_insulation['cost'] /= self.scale_insulation
self.preferences_insulation['bill_saved'] /= self.scale_insulation
self.constant_heater = None
self.constant_insulation_intensive = None
self.constant_insulation_extensive = None
self.scale_insulation = None
self.rational_hidden_cost = None
[docs] def flow_demolition(self, demolition_rate, step=1):
"""Demolition of E, F and G buildings based on their share in the mobile stock.
Returns
-------
Series
"""
self.logger.info('Demolition')
if isinstance(demolition_rate, Series):
demolition_rate = demolition_rate.loc[self.year]
demolition_total = (demolition_rate * self.stock_mobile).sum() * step
stock_demolition = self.stock_mobile[self.certificate.isin(self._target_demolition)]
if stock_demolition.sum() < demolition_total:
self._target_demolition = ['G', 'F', 'E', 'D']
stock_demolition = self.stock_mobile[self.certificate.isin(self._target_demolition)]
if stock_demolition.sum() < demolition_total:
self._target_demolition = ['G', 'F', 'E', 'D', 'C']
stock_demolition = self.stock_mobile[self.certificate.isin(self._target_demolition)]
stock_demolition = stock_demolition / stock_demolition.sum()
flow_demolition = (stock_demolition * demolition_total).dropna()
return flow_demolition.reorder_levels(self.stock.index.names)
[docs] def health_cost(self, health_cost_dpe, health_cost_income, prices, stock=None, method_health_cost=None):
if method_health_cost is None:
method_health_cost = self.method_health_cost
if stock is None:
stock = self.stock
if method_health_cost == 'epc':
_, certificate, _ = self.consumption_heating(method='3uses', full_output=True)
temp = concat((stock, reindex_mi(certificate, stock.index).rename('Performance')), axis=1)
temp.set_index('Performance', append=True, inplace=True)
temp = temp.squeeze()
return (temp * reindex_mi(health_cost_dpe, temp.index)).sum() / 10 ** 9
elif method_health_cost == 'heating_intensity':
heating_intensity = self.to_heating_intensity(stock.index, prices)
stock = concat((stock, heating_intensity), axis=1, keys=['Stock', 'Heating intensity'])
stock_health = stock.loc[stock['Heating intensity'] <= self.hi_threshold, 'Stock']
return (health_cost_income * stock_health).sum() / 10 ** 9
[docs] def calculate_renovation(self, prices, cost_insulation, cost_heater, carbon_content, health_cost=None,
carbon_value=50):
"""Calculate the economics renovation of the building stock.
Parameters
----------
prices: Series
Energy prices.
cost_insulation: Series
Cost of insulation measures.
cost_heater: Series
Cost of heater measures.
carbon_content: Series
Carbon content of energy carriers.
health_cost: Series
Health cost of energy carriers.
carbon_value: float
Carbon value.
"""
# select stock
stock = self.add_certificate(self.stock)
stock = stock[stock.index.get_level_values('Performance').astype(str) > 'B']
stock = stock.droplevel('Performance')
index = stock.index
# calculate consumption before renovation and heater replacement
consumption_before = self.consumption_heating_store(index, full_output=False)
consumption_before = reindex_mi(consumption_before, index)
temp = consumption_before * reindex_mi(self._surface, consumption_before.index)
heating_intensity_before = self._to_heating_intensity(temp.index, prices,
consumption=temp,
level_heater='Heating system')
consumption_before *= heating_intensity_before
c_content = carbon_content.reindex(self.to_energy(consumption_before)).set_axis(consumption_before.index)
emission_before = consumption_before * c_content / 1e6
# calculate consumption after renovation and heater replacement
s_no_switch = concat((stock, Series(stock.index.get_level_values('Heating system'), index=stock.index,
name='Heating system final')), axis=1).set_index('Heating system final', append=True).squeeze()
idx_fossil = ['Natural gas', 'Oil fuel']
s_switch = stock[self.to_energy(stock).isin(idx_fossil)]
s_switch = concat((s_switch, Series('Electricity-Heat pump water', index=s_switch.index,
name='Heating system final')), axis=1).set_index('Heating system final',
append=True).squeeze()
s = concat((s_no_switch, s_switch), axis=0, keys=[False, True], names=['Heater replacement'])
consumption_after, _, certificate_after = self.prepare_consumption(self._choice_insulation,
index=s.index,
level_heater='Heating system final',
full_output=True)
consumption_after = reindex_mi(consumption_after, s.index)
_, _, certificate_after_3uses = self.prepare_consumption(self._choice_insulation,
index=s.index,
level_heater='Heating system final',
full_output=True,
method_epc='3uses')
temp = (consumption_after.T * reindex_mi(self._surface, consumption_after.index)).T
heating_intensity_after = self._to_heating_intensity(temp.index, prices,
consumption=temp,
level_heater='Heating system final')
consumption_after *= heating_intensity_after
c_content = carbon_content.reindex(self.to_energy(consumption_after, level_heater='Heating system final')).set_axis(consumption_after.index)
emission_after = (consumption_after.T * c_content).T / 1e6
consumption_after = consumption_after.droplevel('Heating system final').unstack('Heater replacement')
emission_after = emission_after.droplevel('Heating system final').unstack('Heater replacement')
# calculate consumptionn emission savingand cost due to renovation and heater replacement
consumption_before = reindex_mi(consumption_before, consumption_after.index)
consumption_saved = (consumption_before - consumption_after.T).T
consumption_saved = (consumption_saved.T * reindex_mi(self._surface, consumption_saved.index)).T
emission_before = reindex_mi(emission_before, consumption_after.index)
emission_saved = (emission_before - emission_after.T).T
emission_saved = (emission_saved.T * reindex_mi(self._surface, emission_saved.index)).T
emission_saved_value = emission_saved * carbon_value
emission_saved *= 1e6
bill_before = self.energy_bill(prices, consumption_before * reindex_mi(self._surface, consumption_saved.index))
bill_after = self.energy_bill(prices, (consumption_after.T * reindex_mi(self._surface, consumption_saved.index)).T)
bill_saved = (bill_before - bill_after.T).T
certificate_after = reindex_mi(certificate_after, s.index)
certificate_after = certificate_after.droplevel('Heating system final').unstack('Heater replacement')
certificate_after_3uses = reindex_mi(certificate_after_3uses, s.index)
certificate_after_3uses = certificate_after_3uses.droplevel('Heating system final').unstack('Heater replacement')
cost = self.prepare_cost_insulation(cost_insulation * self.surface_insulation)
cost = cost.T.multiply(self._surface, level='Housing type').T
cost = concat((cost, cost + cost_heater.loc['Electricity-Heat pump water']), axis=1, keys=[False, True],
names=['Heater replacement'])
cost = cost.reorder_levels(consumption_saved.columns.names, axis=1)
cost = reindex_mi(cost, consumption_saved.index)
health_cost_saved = None
if health_cost is not None:
_, certificate_before, _ = self.consumption_heating(index=index, method='3uses', full_output=True)
df = concat((certificate_before, stock.loc[index]), keys=['Performance', 'Stock'], axis=1).dropna().set_index(
'Performance', append=True).squeeze()
health_cost_before = reindex_mi(health_cost, df.index).droplevel('Performance').fillna(0)
c = reindex_mi(certificate_after_3uses, index)
health_cost_after = {}
for i, j in c.items():
df = concat((j, stock.loc[index]), keys=['Performance', 'Stock'], axis=1).dropna().set_index(
'Performance', append=True).squeeze()
health_cost_after.update({i: reindex_mi(health_cost, df.index).droplevel('Performance')})
health_cost_after = DataFrame(health_cost_after).fillna(0)
health_cost_saved = (health_cost_before - health_cost_after.T).T
health_cost_saved.columns.names = certificate_after_3uses.columns.names
output = {
'consumption_saved': consumption_saved,
'emission_saved': emission_saved,
'emission_saved_value': emission_saved_value,
'cost': cost,
'bill_saved': bill_saved,
'certificate_after': certificate_after,
'health_cost_saved': health_cost_saved,
'stock': stock
}
return output
[docs] def select_renovation_option(self, consumption_saved, emission_saved, cost_investment, stock,
bill_saved, certificate_after, carbon_saved=None,
health_cost_saved=None, lifetime=30, discount_rate=0.05,
measures='deep_renovation',
include_energy=True, include_carbon=False, include_health=False,
include_failures=False, cost_failures=None,
include_failures_landlords=False, cost_failures_landlords=None,
include_failures_multi=False, cost_failures_multi=None,
include_credit_constraint=None, income=None, ratio_threshold=0.05,
duration_loan=10):
"""Select renovation option based on measures.
Parameters
----------
consumption_saved: Series
Energy consumption saved by renovation measures.
emission_saved: Series
Emission saved by renovation measures.
cost_investment: Series
Cost of insulation measures.
stock: Series
Stock of buildings.
certificate_after: Series
Energy performance certificate after renovation measures.
lifetime: int, default 30
Lifetime of insulation measures.
discount_rate: float, default 0.05
Discount rate.
measures: str, default 'deep_renovation'
Type of insulation measures.
carbon_saved: Series, default None
Carbon saved by renovation measures.
bill_saved: Series, default None
Bill saved by renovation measures.
health_cost_saved: Series, default None
Health cost saved by renovation measures.
include_energy: bool, default True
Include energy in the calculation of indicators.
include_carbon: bool, default False
Include carbon in the calculation of indicators.
include_health: bool, default False
Include health in the calculation of indicators.
"""
# calculate the discount factor
discount_factor = (1 - (1 + discount_rate) ** -lifetime) / discount_rate
if isinstance(discount_rate, Series):
discount_rate = reindex_mi(discount_rate, cost_investment.index)
discount_rate = concat([discount_rate] * cost_investment.shape[1], axis=1, keys=cost_investment.columns)
# format the consumption saved
cost_annualized = calculate_annuities(cost_investment, lifetime=lifetime, discount_rate=discount_rate)
consumption_saved = reindex_mi(consumption_saved, stock.index).dropna(how='all')
emission_saved = reindex_mi(emission_saved, stock.index).dropna(how='all')
bill_saved = reindex_mi(bill_saved, stock.index).dropna(how='all')
if include_health is True:
health_cost_saved = reindex_mi(health_cost_saved, stock.index).dropna(how='all')
index = consumption_saved.index
cost_annualized = reindex_mi(cost_annualized, index)
cost_investment = reindex_mi(cost_investment, index)
discount_factor = reindex_mi(discount_factor, index)
rslt_npv, rslt_roi = None, None
if measures in ['global_insulation', 'global_renovation']:
INSULATION = {'Wall': True, 'Floor': True, 'Roof': True, 'Windows': True}
insulation = deepcopy(INSULATION)
if measures == 'global_renovation':
insulation.update({'Heater replacement': True})
else:
insulation.update({'Heater replacement': False})
insulation = MultiIndex.from_frame(Series(insulation).to_frame().T)
consumption_saved_efficiency = consumption_saved.loc[:, insulation]
emission_saved_efficiency = emission_saved.loc[:, insulation]
cost_annualized = cost_annualized.loc[:, insulation]
if include_carbon is True:
carbon_saved = reindex_mi(carbon_saved, index)
carbon_saved = carbon_saved.loc[:, insulation]
cost_annualized -= carbon_saved
if include_health is True:
health_cost_saved = health_cost_saved.loc[:, insulation]
cost_annualized -= health_cost_saved
cost_efficiency = cost_annualized / consumption_saved_efficiency
cost_efficiency_carbon = cost_annualized / emission_saved_efficiency
consumption_saved_efficiency = (stock * consumption_saved_efficiency.T).T
consumption_saved_efficiency = consumption_saved_efficiency.stack(consumption_saved_efficiency.columns.names).squeeze()
emission_saved_efficiency = (stock * emission_saved_efficiency.T).T
emission_saved_efficiency = emission_saved_efficiency.stack(emission_saved_efficiency.columns.names).squeeze()
cost_efficiency = cost_efficiency.stack(cost_efficiency.columns.names).squeeze()
cost_efficiency_carbon = cost_efficiency_carbon.stack(cost_efficiency_carbon.columns.names).squeeze()
elif measures in ['deep_renovation', 'deep_insulation', 'heater_replacement']:
# prepare information
dict_df = {'consumption_saved': consumption_saved,
'emission_saved': emission_saved,
'cost_investment': cost_investment,
'bill_saved': bill_saved
}
cash_flow = bill_saved.copy()
if include_carbon is True:
carbon_saved = reindex_mi(carbon_saved, index)
cost_annualized = cost_annualized - carbon_saved
cash_flow += carbon_saved
dict_df.update({'carbon_saved': carbon_saved})
if include_health is True:
cost_annualized -= health_cost_saved
cash_flow += health_cost_saved
dict_df.update({'health_cost_saved': health_cost_saved})
dict_df.update({'cash_flow': cash_flow})
# filter renovation work
c_after = certificate_after.copy()
if measures == 'deep_insulation':
c_after.loc[:, c_after.columns.get_level_values('Heater replacement')] = float('nan')
c_after = c_after.dropna(how='all', axis=1)
c_after = c_after.reindex(stock.index)
c_after.dropna(how='all', inplace=True)
condition = self.select_deep_renovation(c_after)
condition = condition.replace(False, float('nan'))
# credit constraint
if include_credit_constraint is True and income is not None:
cost_year = cost_investment / duration_loan
income.rename_axis(['Income owner'], inplace=True)
income = reindex_mi(income, cost_year.index)
ratio = (cost_year.T / income).T
condition_ratio = (ratio < ratio_threshold).replace(False, float('nan'))
condition *= condition_ratio
# calculate cost-efficiency by anualizing investment cost
if include_failures is True and cost_failures is not None:
if isinstance(cost_failures, Series):
cost_failures_annualized = reindex_mi(cost_failures, cost_annualized.index).fillna(0)
cost_failures_annualized = concat([cost_failures_annualized] * discount_rate.shape[1], axis=1, keys=discount_rate.columns)
cost_failures_annualized = calculate_annuities(cost_failures_annualized, lifetime=lifetime, discount_rate=discount_rate)
cost_annualized = cost_annualized + cost_failures_annualized
if include_failures_landlords is True and cost_failures_landlords is not None:
if isinstance(cost_failures_landlords, Series):
cost_failures_landlords_annualized = reindex_mi(cost_failures_landlords, cash_flow.index).fillna(0)
cost_failures_landlords_annualized = concat([cost_failures_landlords_annualized] * discount_rate.shape[1], axis=1, keys=discount_rate.columns)
cost_failures_landlords_annualized = calculate_annuities(cost_failures_landlords_annualized, lifetime=lifetime, discount_rate=discount_rate)
cost_annualized = cost_annualized + cost_failures_landlords_annualized
if include_failures_multi is True and cost_failures_multi is not None:
if isinstance(cost_failures_multi, Series):
cost_failures_multi_annualized = reindex_mi(cost_failures_multi, cash_flow.index).fillna(0)
cost_failures_multi_annualized = concat([cost_failures_multi_annualized] * discount_rate.shape[1], axis=1, keys=discount_rate.columns)
cost_failures_multi_annualized = calculate_annuities(cost_failures_multi_annualized, lifetime=lifetime, discount_rate=discount_rate)
cost_annualized = cost_annualized + cost_failures_multi_annualized
if include_energy is True:
cost_net = cost_annualized - cash_flow
else:
cost_net = cost_annualized
# calculate cost-efficiency of renovation works
cost_efficiency = cost_net / consumption_saved
cost_efficiency *= reindex_mi(condition, cost_efficiency.index)
cost_efficiency = cost_efficiency.apply(pd.to_numeric)
cost_efficiency.dropna(how='all', inplace=True)
rslt_efficiency = self.find_best_option(cost_efficiency, dict_df=dict_df, func='min')
rslt_efficiency['stock'] = stock
rslt_efficiency['consumption_saved_efficiency'] = rslt_efficiency['stock'] * rslt_efficiency['consumption_saved']
# calculate cost-efficiency of avoiding emission of renovation works
cost_efficiency_carbon = cost_net / emission_saved
cost_efficiency_carbon *= reindex_mi(condition, cost_efficiency_carbon.index)
cost_efficiency_carbon = cost_efficiency_carbon.apply(pd.to_numeric)
cost_efficiency_carbon.dropna(how='all', inplace=True)
rslt_efficiency_carbon = self.find_best_option(cost_efficiency_carbon, dict_df=dict_df, func='min')
rslt_efficiency_carbon['stock'] = stock
rslt_efficiency_carbon['emission_saved_efficiency'] = rslt_efficiency_carbon['stock'] * rslt_efficiency['emission_saved']
# calculate net present value of renovation project
temp = deepcopy(dict_df)
if cost_failures is not None:
if isinstance(cost_failures, Series):
cost_failures = reindex_mi(cost_failures, cash_flow.index).fillna(0)
else:
raise NotImplemented
if include_failures is True:
cost_investment = (cost_investment.T + cost_failures).T
if cost_failures_landlords is not None:
if isinstance(cost_failures_landlords, Series):
cost_failures_landlords = reindex_mi(cost_failures_landlords, cash_flow.index).fillna(0)
else:
raise NotImplemented
if include_failures_landlords is True:
cost_investment = (cost_investment.T + cost_failures_landlords).T
if cost_failures_multi is not None:
if isinstance(cost_failures_multi, Series):
cost_failures_multi = reindex_mi(cost_failures_multi, cash_flow.index).fillna(0)
else:
raise NotImplemented
if include_failures_multi is True:
cost_investment = (cost_investment.T + cost_failures_multi).T
if include_energy is True:
if isinstance(discount_factor, Series):
npv = - cost_investment + (discount_factor * cash_flow.T).T
else:
npv = - cost_investment + discount_factor * cash_flow
else:
npv = - cost_investment
npv *= reindex_mi(condition, npv.index)
npv = npv.apply(pd.to_numeric)
npv.dropna(how='all', inplace=True)
rslt_npv = self.find_best_option(npv, dict_df=temp, func='max')
# add information in dict result
rslt_npv['cost_social'] = - rslt_npv['criteria']
rslt_npv['stock'] = stock
rslt_npv['consumption_saved_npv'] = stock * rslt_npv['consumption_saved']
rslt_npv['emission_saved_npv'] = stock * rslt_npv['emission_saved']
if cost_failures is not None:
rslt_npv['cost_failures'] = cost_failures
if cost_failures_landlords is not None:
rslt_npv['cost_failures_landlords'] = cost_failures_landlords
if cost_failures_multi is not None:
rslt_npv['cost_failures_multi'] = cost_failures_multi
# calculate roi
if include_energy is True:
roi = cost_investment / cash_flow
roi *= reindex_mi(condition, roi.index)
roi = roi.apply(pd.to_numeric)
roi.dropna(how='all', inplace=True)
rslt_roi = self.find_best_option(roi, dict_df=dict_df, func='min')
rslt_roi['stock'] = stock
rslt_roi['consumption_saved_roi'] = stock * rslt_roi['consumption_saved']
else:
raise NotImplemented
output = {
'rslt_efficiency': rslt_efficiency,
'rslt_efficiency_carbon': rslt_efficiency_carbon,
'rslt_npv': rslt_npv,
'rslt_roi': rslt_roi
}
return output
[docs] def make_static_analysis(self, cost_insulation, cost_heater, prices,
health_cost, carbon_content,
path_out=None, carbon_value=50, selected_options=None, sufix='',
social_discount_rate=0.05, private_discount_rate=0.05, lifetime=25,
make_plot=False, cost_failures=None, cost_failures_landlords=None, cost_failures_multi=None,
income=None, duration_loan=10):
def closest(df, value):
return (df - value).abs().sort_values().index[0]
# select only stock mobile and existing before the first year
if path_out is None:
path_out = self.path_ini
# get all economics information for all renovation options
output = self.calculate_renovation(prices, cost_insulation, cost_heater, carbon_content, health_cost=health_cost,
carbon_value=carbon_value)
options = get_json('project/input/resources_dir/scenario_static.json')
colors = get_json('project/input/resources_dir/colors_scenario_static.json')
for k, i in options.items():
if i['discount_rate'] == 'social_discount_rate':
i['discount_rate'] = social_discount_rate
if i['discount_rate'] == 'private_discount_rate':
i['discount_rate'] = private_discount_rate
if selected_options is None:
selected_options = ['Deep insulation, private',
'Deep renovation, social',
'Deep renovation, private']
options = {k: i for k, i in options.items() if
k in selected_options}
dict_rslt, dict_subsidies = {}, {}
for key, option in options.items():
# select options for the analysis and calculate main indicators
output_select = self.select_renovation_option(
output['consumption_saved'],
output['emission_saved'],
output['cost'],
output['stock'],
output['bill_saved'],
output['certificate_after'],
carbon_saved=output['emission_saved_value'],
health_cost_saved=output['health_cost_saved'],
lifetime=lifetime,
cost_failures=cost_failures,
cost_failures_landlords=cost_failures_landlords,
cost_failures_multi=cost_failures_multi,
income=income,
duration_loan=duration_loan,
**option)
# format the results to plot monotonic curve
temp = self.prepare_abatement_plot(
output_select['rslt_efficiency'],
output_select['rslt_efficiency_carbon'],
output_select['rslt_npv'],
output_select['rslt_roi'])
dict_rslt.update({key: temp})
if 'I-E-C-S' in key:
names = output['consumption_saved'].columns.names
replace_columns = {
'consumption_saved': "Économie d'énergie (kWh/an.logement)",
'emission_saved': "Économie d'émission (gCO2/an.logement)",
'cost_investment': "Coût d'investissement (€/logement)",
'bill_saved': "Économie de facture (€/(an.logement))",
'carbon_saved': "Économie de dommage carbone (€/(an.logement))",
'health_cost_saved': "Économie de coût de santé (€/(an.logement))",
'cash_flow': "Cash flow champs social (€/(an.logement))",
'cost_social': "Coût actualisé social (€/logement)",
'stock': "Stock de résidences principales (unité)",
'consumption_saved_npv': "Économie d'énergie (kWh/an)",
'emission_saved_npv': "Économie d'émission (gCO2/an)",
'cost_failures': "Coût des frictions en forme réduite (€/logement)"
}
output_select['rslt_npv'].rename(columns=replace_columns, inplace=True)
# Transforming the index levels into a single level
output_select['rslt_npv']['Measures'] = output_select['rslt_npv']['columns'].apply(lambda row: ", ".join([name for name, value in zip(names, row) if value]))
dict_subsidies.update({key: output_select['rslt_npv']})
dict_rslt = reverse_dict(dict_rslt)
# format the results based on initial value
c_before = self.consumption_heating_store(self._stock_ref.index, full_output=False)
c_before = reindex_mi(c_before, self._stock_ref.index) * self._stock_ref * reindex_mi(self._surface,
self._stock_ref.index)
heating_intensity_before = self._to_heating_intensity(c_before.index, prices)
c_before *= heating_intensity_before
c_content = carbon_content.reindex(self.to_energy(c_before)).set_axis(c_before.index)
e_before = (c_before * c_content).sum()
stock_ini = self._stock_ref.sum() / 1e6
# percentage of initial value
for key in dict_rslt.keys():
if key.split('_')[-1] == 'emission':
for k in dict_rslt[key].keys():
dict_rslt[key][k].index = dict_rslt[key][k].index * 1e9 / e_before
# dict_rslt[key][k].name = 'Emission saving (% / 2018)'
dict_rslt[key][k].index.rename('Emission saved (% / 2018)', inplace=True)
dict_rslt[key][k].index.rename('Emission évitée (/2018)', inplace=True)
elif key.split('_')[-1] == 'pop':
for k in dict_rslt[key].keys():
dict_rslt[key][k].index = dict_rslt[key][k].index / stock_ini
# dict_rslt[key][k].name = 'Stock de résidences principales (% / 2018)'
dict_rslt[key][k].index.rename('Stock de résidences principales (% / 2018)', inplace=True)
dict_rslt[key][k].index.rename('Stock de résidences principales (/2018)', inplace=True)
elif key.split('_')[-1] == 'energy':
for k in dict_rslt[key].keys():
dict_rslt[key][k].index = dict_rslt[key][k].index / (c_before.sum() / 10 ** 9)
# dict_rslt[key][k].name = 'Consumption saving (% / 2018)'
dict_rslt[key][k].index.rename('Consumption saved (% / 2018)', inplace=True)
dict_rslt[key][k].index.rename("Économie d'énergie (/2018)", inplace=True)
else:
raise
rslt_profitability = {}
list_indicators = ['npv_energy', 'npv_emission', 'npv_pop']
for indicator in list_indicators:
rslt_profitability.update({indicator: {}})
for k, i in dict_rslt[indicator].items():
rslt_profitability[indicator].update({k: closest(i, 0)})
# make plots
if make_plot is True:
temp = {k: i for k, i in dict_rslt['efficiency_energy'].items() if k.split(',')[0] in ['I', 'I-C', 'I-C-S']}
if temp != {}:
make_plots(temp, "Coût d'abattement énergie (€ par kWh cumac)",
save=os.path.join(path_out, 'cout_abattement_energie{}.png'.format(sufix)), ymax=0.3, ymin=0,
format_y=lambda y, _: '{:.2f}€/kWh'.format(y), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), order_legend=False, x_tick_interval=0.1,
xmin=0, xmax=1, ncol=1, export_csv=True)
temp = {k: i for k, i in dict_rslt['efficiency_pop'].items() if k.split(',')[0] in ['I', 'I-C', 'I-C-S']}
if temp != {}:
make_plots(temp, "Coût d'abattement énergie (€ par kWh cumac)",
save=os.path.join(path_out, 'cout_abattement_logement{}.png'.format(sufix)), ymax=0.3, ymin=0,
format_y=lambda y, _: '{:.2f}€/kWh'.format(y), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), order_legend=False, x_tick_interval=0.1,
xmin=0, xmax=1, ncol=1, export_csv=True)
efficiency_scenarios = ['I-E-S', 'I-E social discount', 'I-E', 'I-E-MF Bailleurs',
'I-E-MF Bailleurs + Collectifs', 'I-E-MF Contrainte crédit']
temp = {k: i for k, i in dict_rslt['efficiency_carbon_emission'].items() if k.split(',')[0] in efficiency_scenarios}
if temp != {}:
make_plots(temp, "Coût d'abattement émission (€ par tCO2 évitée)",
save=os.path.join(path_out, 'cout_abattement_emission{}.png'.format(sufix)), ymax=1000, ymin=0,
format_y=lambda y, _: '{:.0f}€/tCO2'.format(y), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), order_legend=efficiency_scenarios, x_tick_interval=0.1,
xmin=0, xmax=1, ncol=1, hlines=250, export_csv=True)
temp = {k: i for k, i in dict_rslt['efficiency_carbon_pop'].items() if k.split(',')[0] in efficiency_scenarios}
if temp != {}:
make_plots(temp, "Coût d'abattement émission (€ par tCO2 évitée)",
save=os.path.join(path_out, 'cout_abattement_logement{}.png'.format(sufix)), ymax=1000, ymin=0,
format_y=lambda y, _: '{:.0f}€/tCO2'.format(y), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), order_legend=efficiency_scenarios, x_tick_interval=0.1,
xmin=0, xmax=1, ncol=1, hlines=250, export_csv=True)
npv_scenarios = ['I-E-C-S', 'I-E-C', 'I-E social discount', 'I-E', 'I-E-MF Bailleurs',
'I-E-MF Bailleurs + Collectifs', 'I-E-MF Contrainte crédit']
temp = {k.split(',')[0]: i for k, i in dict_rslt['npv_energy'].items() if k.split(',')[0] in npv_scenarios}
if temp != {}:
make_plots(temp, 'Coût actualisé (Milliers €)',
save=os.path.join(path_out, 'van_energie{}.png'.format(sufix)),
format_y=lambda y, _: '{:.0f}k€'.format(y / 10 ** 3), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), hlines=0, order_legend=npv_scenarios, x_tick_interval=0.1,
ymax=40*1e3, ymin=-20*1e3, xmin=0, xmax=1, ncol=1, export_csv=True)
temp = {k: i for k, i in dict_rslt['npv_pop'].items() if k.split(',')[0] in npv_scenarios}
if temp != {}:
make_plots(temp, 'Coût actualisé (Milliers €)',
save=os.path.join(path_out, 'van_logement{}.png'.format(sufix)),
format_y=lambda y, _: '{:.0f}k€'.format(y / 10 ** 3), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), hlines=0, order_legend=False, x_tick_interval=0.1,
ymax=40*1e3, ymin=-20*1e3, xmin=0, xmax=1, ncol=1, export_csv=True)
temp = {k: i for k, i in dict_rslt['npv_emission'].items() if k.split(',')[0] in npv_scenarios}
if temp != {}:
make_plots(temp, 'Coût actualisé (Milliers €)',
save=os.path.join(path_out, 'van_emission{}.png'.format(sufix)),
format_y=lambda y, _: '{:.0f}k€'.format(y / 10 ** 3), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), hlines=0, order_legend=False, x_tick_interval=0.1,
ymax=40*1e3, ymin=-20*1e3, xmin=0, xmax=1, ncol=1, export_csv=True)
if False:
make_plots(dict_rslt['efficiency'], 'Cost efficiency (euro per kWh saved)',
save=os.path.join(path_out, 'abatement_curve.png'), ymax=1, ymin=-1,
format_y=lambda y, _: '{:.2f}'.format(y), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), order_legend=False)
make_plots(dict_rslt['efficiency'], 'Cost efficiency (euro per kWh saved)',
save=os.path.join(path_out, 'abatement_curve_zoom.png'), ymax=0.3, ymin=0,
format_y=lambda y, _: '{:.2f}'.format(y), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), order_legend=False)
make_plots(dict_rslt['efficiency_carbon'], 'Cost efficiency (euro per tCO2 saved)',
save=os.path.join(path_out, 'abatement_curve_carbon.png'), ymax=5000, ymin=-1000,
format_y=lambda y, _: '{:.0f}'.format(y), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), order_legend=False)
make_plots(dict_rslt['efficiency_carbon'], 'Cost efficiency (euro per tCO2 saved)',
save=os.path.join(path_out, 'abatement_curve_carbon_zoom.png'), ymax=500, ymin=0,
format_y=lambda y, _: '{:.0f}'.format(y), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), order_legend=False)
make_plots(dict_rslt['npv'], 'Net present cost (Thousand euro)',
save=os.path.join(path_out, 'npv_insulation_curve.png'),
format_y=lambda y, _: '{:.0f}'.format(y / 10 ** 3), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), ymin=None, hlines=0, order_legend=False)
make_plots(dict_rslt['roi'], 'Return Time on Investment (years)',
save=os.path.join(path_out, 'roi_insulation_curve.png'),
format_y=lambda y, _: '{:.0f}'.format(y), loc='left', left=1.25, colors=colors,
format_x=lambda x, _: '{:.0%}'.format(x), ymax=50, order_legend=False)
return rslt_profitability, dict_subsidies
[docs] @staticmethod
def prepare_abatement_plot(rslt_efficiency, rslt_efficiency_carbon, rslt_npv=None, rslt_roi=None):
def sort_cum(df, round=3):
df.sort_values(y.name, inplace=True)
df.dropna(inplace=True)
df[y.name] = df[y.name].round(round)
df = df.groupby([y.name]).agg({x.name: 'sum', y.name: 'first'})
df[x.name] = df[x.name].cumsum() / 10 ** 9
df = df.set_index(x.name)[y.name]
return df
output = {}
x = rslt_efficiency['consumption_saved_efficiency'].rename('Consumption saved (TWh)')
y = rslt_efficiency['criteria'].rename('Cost efficiency (euro/kWh)')
df_efficiency = concat((x, y), axis=1)
df_efficiency = sort_cum(df_efficiency, round=3)
output.update({'efficiency_energy': df_efficiency})
x = rslt_efficiency['stock'].rename('Stock (Million)')
y = rslt_efficiency['criteria'].rename('Cost efficiency (euro/kWh)')
df_efficiency_pop = concat((x, y), axis=1)
df_efficiency_pop = sort_cum(df_efficiency_pop, round=3)
df_efficiency_pop.index = df_efficiency_pop.index * 10 ** 3
output.update({'efficiency_pop': df_efficiency_pop})
x = rslt_efficiency_carbon['emission_saved_efficiency'].rename('Emission saved (MtCO2)')
y = rslt_efficiency_carbon['criteria'].rename('Cost efficiency (euro/tCO2)') * 10 ** 6
df_efficiency_carbon = concat((x, y), axis=1)
df_efficiency_carbon = sort_cum(df_efficiency_carbon, round=0)
output.update({'efficiency_carbon_emission': df_efficiency_carbon})
x = rslt_efficiency_carbon['stock'].rename('Stock (Million)')
y = rslt_efficiency_carbon['criteria'].rename('Cost efficiency (euro/tCO2)') * 10 ** 6
df_efficiency_carbon_pop = concat((x, y), axis=1)
df_efficiency_carbon_pop = sort_cum(df_efficiency_carbon_pop, round=0)
df_efficiency_carbon_pop.index = df_efficiency_carbon_pop.index * 10 ** 3
output.update({'efficiency_carbon_pop': df_efficiency_carbon_pop})
# sort by marginal cost
if rslt_npv is not None:
x = rslt_npv['consumption_saved_npv'].rename('Consumption saved (TWh)')
y = rslt_npv['cost_social'].rename('Net present cost (euro)')
df_npv = concat((x, y), axis=1)
df_npv.dropna(inplace=True)
df_npv = df_npv[df_npv['Consumption saved (TWh)'] > 0]
df_npv = sort_cum(df_npv, round=0)
output.update({'npv_energy': df_npv})
x = rslt_npv['emission_saved_npv'].rename('Emission saved (MtCO2)')
y = rslt_npv['cost_social'].rename('Net present cost (euro)')
df_npv = concat((x, y), axis=1)
df_npv.dropna(inplace=True)
df_npv = df_npv[df_npv['Emission saved (MtCO2)'] > 0]
df_npv = sort_cum(df_npv, round=0)
output.update({'npv_emission': df_npv})
x = rslt_npv['stock'].rename('Stock (Million)')
y = rslt_npv['cost_social'].rename('Net present cost(euro)')
df_npv = concat((x, y), axis=1)
df_npv = sort_cum(df_npv, round=0)
df_npv.index = df_npv.index * 10 ** 3
output.update({'npv_pop': df_npv})
if rslt_roi is not None:
x = rslt_roi['consumption_saved_roi'].rename('Consumption saved (TWh)')
y = rslt_roi['criteria'].rename('Return on Investment (ROI) (year)')
df_roi = concat((x, y), axis=1)
df_roi = sort_cum(df_roi, round=1)
output.update({'roi_energy': df_roi})
return output