Source code for project.read_input
# Copyright 2020-2021 Ecole Nationale des Ponts et Chaussées
#
# This file is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# Original author Lucas Vivier <vivier@centre-cired.fr>
import copy
import os
import pandas as pd
from pandas import Series, DataFrame, concat, MultiIndex, Index
from numpy.random import normal
from numpy.testing import assert_almost_equal
from project.utils import reindex_mi, get_pandas, make_plot, get_series, reverse_dict, select
from project.dynamic import stock_need, share_multi_family, share_type_built
from project.input.param import generic_input
from project.input.resources import resources_data
[docs]class PublicPolicy:
"""Public policy parent class.
Attributes
----------
name : str
Name of the policy.
start : int
Year policy starts.
end : int
Year policy ends.
value: float
policy : {'energy_taxes', 'subsidies'}
"""
def __init__(self, name, start, end, value, policy, gest=None, cap=None, target=None, cost_min=None, cost_max=None,
new=None, by='index', non_cumulative=None, frequency=None, intensive=None, min_performance=None,
bonus=False, social_housing=True, duration=None, recycling=None, proportional=None,
recycling_ini=None, year_stop=None, years_stop=None, public_cost=None, variable=True):
self.name = name
self.start = start
self.end = end
self.value = value
self.policy = policy
self.gest = gest
self.cap = cap
self.target = target
self.cost_max = cost_max
self.cost_min = cost_min
self.new = new
self.by = by
self.non_cumulative = non_cumulative
self.frequency = frequency
self.intensive = intensive
self.min_performance = min_performance
self.bonus = bonus
self.social_housing = social_housing
self.duration = duration
self.recycling = recycling
self.recycling_ini = recycling_ini
self.proportional = proportional
self.public_cost = public_cost
self.year_stop = year_stop
self.years_stop = years_stop
self.variable = variable
self.incentive = False
if policy in ['subsidy_ad_valorem', 'subsidy_target', 'subsidy_proportional', 'bonus', 'reduced_vat',
'zero_interest_loan']:
self.incentive = True
if (year_stop or years_stop) and self.incentive:
if not isinstance(value, dict):
self.value = {k: value for k in range(self.start, self.end)}
if year_stop:
if self.incentive:
if year_stop < end:
self.apply_year_stop(year_stop, self.value)
else:
self.end = min(year_stop, end)
if years_stop:
if self.incentive:
for y in years_stop:
if y < end:
self.apply_year_stop(y, self.value)
else:
self.end = min(years_stop[0], end)
def __repr__(self):
return self.name
def __str__(self):
return self.name
[docs] def cost_targeted(self, cost_insulation, target_subsidies=None):
"""
Gives the amount of the cost of a gesture for a segment over which the subvention applies.
If self.new, cost global is the amount loaned for gestures which are considered as 'global renovations',
and thus caped by the proper maximum zil amount taking the heater replacement into account.
Also, cost_no_global are the amount loaned for unique or bunch renovations actions.
Parameters
----------
cost_insulation: pd.DataFrame
Cost of an insulation gesture
target_subsidies: pd.DataFrame
Boolean values. If self.new it corresponds to the global renovations
Returns
-------
cost: pd.DataFrame
Each cell of the DataFrame corresponds to the cost after subventions of a specific gesture and segment
"""
cost = cost_insulation.copy()
if self.target is not None and target_subsidies is not None:
if isinstance(target_subsidies, Series):
target_subsidies = pd.concat([target_subsidies] * cost.shape[1], axis=1, keys=cost.columns)
cost = cost[target_subsidies.astype(bool)].fillna(0)
if self.cost_max is not None:
cost_max = reindex_mi(self.cost_max, cost.index)
cost_max = pd.concat([cost_max] * cost.shape[1], axis=1).set_axis(cost.columns, axis=1)
cost[cost > cost_max] = cost_max
if self.cost_min is not None:
cost_min = reindex_mi(self.cost_min, cost.index)
cost_min = pd.concat([cost_min] * cost.shape[1], axis=1).set_axis(
cost.columns, axis=1)
cost[cost < cost_min] = 0
return cost
[docs] def apply_year_stop(self, year_stop, value):
"""Put values of an incentive to 1e-4 to assess the impact of removing the incentive.
Rationale is to keep calculating the number of eligible households that renovate without the incentive.
Parameters
----------
year_stop
value
Returns
-------
"""
if self.policy == 'zero_interest_loan':
if not isinstance(self.cost_max, dict):
self.cost_max = {k: self.cost_max for k in range(self.start, self.end)}
self.cost_max[year_stop] = 1e-4
else:
if self.policy == 'reduced_vat':
val = 0.1 - 1e-4
else:
val = 1e-4
if isinstance(value[year_stop], (pd.Series, pd.DataFrame)):
temp = value[year_stop].mask(value[year_stop] > 0, val)
else:
temp = val
value[year_stop] = temp
self.value = value
[docs]def read_stock(config):
"""Read initial building stock.
Parameters
----------
config: dict
Returns
-------
pd.Series
MultiIndex Series with building stock attributes as levels.
"""
stock = get_pandas(config['building_stock'], lambda x: pd.read_csv(x, index_col=[0, 1, 2, 3, 4, 5, 6, 7, 8]).squeeze()).rename('Stock buildings')
stock_sum = stock.sum()
stock = stock.reset_index('Heating system')
# replace wood fuel multi-family by natural gas in building stock
idx = (stock['Heating system'] == 'Wood fuel-Standard boiler') & (stock.index.get_level_values('Housing type') == 'Multi-family')
stock.loc[idx, 'Heating system'] = 'Natural gas-Standard boiler'
idx = (stock['Heating system'] == 'Wood fuel-Performance boiler') & (stock.index.get_level_values('Housing type') == 'Multi-family')
stock.loc[idx, 'Heating system'] = 'Natural gas-Performance boiler'
assert_almost_equal(stock['Stock buildings'].sum(), stock_sum)
# specify heat-pump
repartition = 0.8
idx = stock['Heating system'] == 'Electricity-Heat pump'
hp_water = stock.loc[idx, :].copy()
hp_water['Stock buildings'] *= repartition
hp_water['Heating system'] = hp_water['Heating system'].str.replace('Electricity-Heat pump', 'Electricity-Heat pump water')
hp_air = stock.loc[idx, :].copy()
hp_air['Stock buildings'] *= (1 - repartition)
hp_air['Heating system'] = hp_air['Heating system'].str.replace('Electricity-Heat pump', 'Electricity-Heat pump air')
stock = stock.loc[~idx, :]
stock = pd.concat((stock, hp_water, hp_air), axis=0)
if config['simple'].get('collective_boiler'):
multi_family = stock.index.get_level_values('Housing type') == 'Multi-family'
oil_fuel = stock['Heating system'].isin(['Oil fuel-Performance boiler', 'Oil fuel-Standard boiler'])
stock.loc[multi_family & oil_fuel, 'Heating system'] = 'Oil fuel-Collective boiler'
repartition = 0.5
idx_gas = stock['Heating system'].isin(['Natural gas-Performance boiler', 'Natural gas-Standard boiler']) & multi_family
collective_gas = stock.loc[idx_gas, :].copy()
collective_gas['Stock buildings'] *= repartition
collective_gas['Heating system'] = collective_gas['Heating system'].str.replace('Natural gas-Performance boiler', 'Natural gas-Collective boiler')
collective_gas['Heating system'] = collective_gas['Heating system'].str.replace('Natural gas-Standard boiler', 'Natural gas-Collective boiler')
individual_gas = stock.loc[idx_gas, :].copy()
individual_gas['Stock buildings'] *= (1 - repartition)
stock = stock.loc[~idx_gas, :]
stock = pd.concat((stock, collective_gas, individual_gas), axis=0)
stock = stock.set_index('Heating system', append=True).squeeze()
stock = stock.groupby(stock.index.names).sum()
stock = pd.concat([stock], keys=[True], names=['Existing'])
idx_names = ['Existing', 'Occupancy status', 'Income owner', 'Income tenant', 'Housing type',
'Heating system', 'Wall', 'Floor', 'Roof', 'Windows']
stock = stock.reorder_levels(idx_names)
assert_almost_equal(stock.sum(), stock_sum)
return stock
[docs]def read_policies(config):
# TODO: replace names to clarify the definition of policies (index = households, columns = technologies).
# TODO: target should be a list with combination of simple condition.
def read_mpr(data):
l = list()
heater = get_series(data['heater']).unstack('Heating system final')
if 'Year' in heater.index.names:
heater = {y: heater.loc[heater.index.get_level_values('Year') == y, :].droplevel('Year') for y in
heater.index.get_level_values('Year').unique()}
elif data.get('growth_heater'):
growth_heater = get_pandas(data['growth_heater'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze())
heater = {k: i * heater for k, i in growth_heater.items()}
insulation = get_pandas(data['insulation'], lambda x: pd.read_csv(x, index_col=[0, 1]))
if data.get('growth_insulation'):
growth_insulation = get_pandas(data['growth_insulation'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze())
insulation = {k: i * insulation for k, i in growth_insulation.items()}
if data.get('deep_renovation'):
deep_renovation = get_pandas(data['deep_renovation'],
lambda x: pd.read_csv(x, index_col=[0]).squeeze())
l.append(PublicPolicy(data['name'], data['start'], data['end'], deep_renovation, 'subsidy_target',
gest='insulation', target='deep_renovation', year_stop=data.get('year_stop'),
years_stop=data.get('years_stop')))
if data['bonus']:
bonus_best = get_pandas(data['bonus'], lambda x: pd.read_csv(x, index_col=[0, 1]).squeeze())
bonus_worst = get_pandas(data['bonus'], lambda x: pd.read_csv(x, index_col=[0, 1]).squeeze())
l.append(PublicPolicy(data['name'], data['start'], data['end'], bonus_best, 'bonus', gest='insulation',
target='reach_best', year_stop=data.get('year_stop'),
years_stop=data.get('years_stop')))
l.append(PublicPolicy(data['name'], data['start'], data['end'], bonus_worst, 'bonus', gest='insulation',
target='out_worst', year_stop=data.get('year_stop'),
years_stop=data.get('years_stop')))
if data['heater'] is not None:
l.append(PublicPolicy(data['name'], data['start'], data['end'], heater, 'subsidy_target',
gest='heater',
year_stop=data.get('year_stop'),
years_stop=data.get('years_stop')))
if data['insulation'] is not None:
l.append(PublicPolicy(data['name'], data['start'], data['end'], insulation, 'subsidy_target',
gest='insulation',
target=data.get('target'), year_stop=data.get('year_stop'),
years_stop=data.get('years_stop')))
return l
def read_mpr_serenite(data):
"""Create MPR Serenite PublicPolicy instance.
MaPrimeRénov' Sérénité (formerly Habiter Mieux Sérénité) for major energy renovation work in your home.
To do so, your work must result in an energy gain of at least 35%.
The amount of the bonus varies according to the amount of your resources.
Parameters
----------
data
Returns
-------
list
"""
l = list()
value = get_series(data['insulation'])
cap = None
if data.get('cap') is not None:
cap = get_series(data['cap'])
if data.get('growth_insulation'):
growth_insulation = get_series(data['growth_insulation'])
value = {k: i * value for k, i in growth_insulation.items()}
cap = {k: i * cap for k, i in growth_insulation.items()}
l.append(PublicPolicy(data['name'], data['start'], data['end'], value, data['policy'],
target=data.get('target'), gest='insulation', non_cumulative=data.get('non_cumulative'),
cap=cap, year_stop=data.get('year_stop'), years_stop=data.get('years_stop')))
if data.get('bonus') is not None:
bonus = get_series(data['bonus'])
l.append(PublicPolicy(data['name'], data['start'], data['end'], bonus, 'bonus', gest='insulation',
year_stop=data.get('year_stop'),
years_stop=data.get('years_stop')))
return l
def read_cee(data):
l = list()
if isinstance(data['value'], str):
cee_value = get_series(data['value'])
elif isinstance(data['value'], (int, float)):
cee_value = pd.Series(data['value'], index=range(data['start'], data['end'] + 1))
else:
raise NotImplemented
if data['cumac_heater'] is not None:
cumac_heater = get_series(data['cumac_heater'])
cee_heater = cumac_heater * cee_value / 1000
cee_heater = cee_heater.unstack('Heating system final')
cee_heater = {y: cee_heater.loc[cee_heater.index.get_level_values('Year') == y, :].droplevel('Year') for y in
cee_heater.index.get_level_values('Year').unique()}
l.append(PublicPolicy('cee', data['start'], data['end'], cee_heater, 'subsidy_target', gest='heater',
social_housing=True, year_stop=data.get('year_stop'), years_stop=data.get('years_stop')))
bonus_heater = get_series(data['bonus_heater']['value']).unstack('Heating system final')
bonus_heater = bonus_heater.reindex(cee_heater[data['start']].columns, axis=1).fillna(0)
if 'Year' in bonus_heater.index.names:
bonus_heater = {y: bonus_heater.loc[bonus_heater.index.get_level_values('Year') == y, :].droplevel('Year') for y in
bonus_heater.index.get_level_values('Year').unique()}
end = min(data['bonus_heater']['end'], data['end'])
l.append(PublicPolicy('cee', data['bonus_heater']['start'], end, bonus_heater, 'bonus', gest='heater',
social_housing=True, year_stop=data.get('year_stop'),
years_stop=data.get('years_stop')))
if data['cumac_insulation'] is not None:
cumac_insulation = get_series(data['cumac_insulation'])
cee_insulation = cumac_insulation * cee_value / 1000
cee_insulation = cee_insulation.unstack('Insulation').rename_axis(None, axis=1)
cee_insulation = {y: cee_insulation.loc[cee_insulation.index.get_level_values('Year') == y, :].squeeze() for y
in cee_insulation.index.get_level_values('Year').unique()}
bonus_insulation = get_pandas(data['bonus_insulation']['value'], lambda x: pd.read_csv(x, index_col=[0]))
end = min(data['bonus_heater']['end'], data['end'])
l.append(PublicPolicy('cee', data['bonus_insulation']['start'], end, bonus_insulation, 'bonus',
gest='insulation', social_housing=True, year_stop=data.get('year_stop'),
years_stop=data.get('years_stop')))
l.append(PublicPolicy('cee', data['start'], data['end'], cee_insulation, 'subsidy_target',
gest='insulation', social_housing=True, year_stop=data.get('year_stop'),
years_stop=data.get('years_stop')))
coefficient_obligation = get_pandas(data['coefficient_obligation'], lambda x: pd.read_csv(x, index_col=[0])).rename_axis('Energy', axis=1)
cee_tax = (coefficient_obligation.T * cee_value).T / 1000
end = data['end']
if data.get('year_stop') is not None:
end = data['year_stop']
cee_tax = cee_tax.loc[data['start']:end - 1, :]
if data.get('years_stop') is not None:
if data['years_stop']:
cee_tax.loc[data['years_stop'], :] = 0
l.append(PublicPolicy('cee', data['start'], end, cee_tax, 'tax'))
return l
def read_cap(data):
l = list()
if 'insulation' in data.keys():
l.append(PublicPolicy('subsidies_cap', data['start'], data['end'], get_series(data['insulation']),
'subsidies_cap', gest='insulation', target=data.get('target_insulation')))
if 'heater' in data.keys():
l.append(PublicPolicy('subsidies_cap', data['start'], data['end'], get_series(data['heater']),
'subsidies_cap', gest='heater', target=data.get('target_heater')))
return l
def read_carbon_tax(data):
tax = get_series(data['tax'], header=None)
emission = get_pandas(data['emission'], lambda x: pd.read_csv(x, index_col=[0]).squeeze())
tax = (tax * emission.T).T.fillna(0) / 10 ** 6
tax = tax.loc[(tax != 0).any(axis=1)]
recycling = None
if data.get('recycling') is not None:
if isinstance(data['recycling'], str):
recycling = get_series(data['recycling'], header=0)
else:
recycling = data['recycling']
end = data['end']
if data.get('year_stop') is not None:
end = data['year_stop']
tax = tax.loc[data['start']:end - 1, :]
if data.get('years_stop') is not None:
if data['years_stop']:
tax.loc[data['years_stop'], :] = 0
return [PublicPolicy('carbon_tax', data['start'], end, tax, 'tax',
recycling=recycling, recycling_ini=data.get('recycling_ini'))]
def read_cite(data):
"""Creates the income tax credit PublicPolicy instance.
Oil fuel-Performant Boiler exempted.
Parameters
----------
data
Returns
-------
list
"""
l = list()
if data['heater'] is not None:
heater = get_series(data['heater']).unstack('Heating system final')
l.append(PublicPolicy('cite', data['start'], data['end'], heater, 'subsidy_ad_valorem', gest='heater',
cap=data['cap'], year_stop=data.get('year_stop'),
years_stop=data.get('years_stop'))
)
if data['insulation'] is not None:
insulation = get_pandas(data['insulation'], lambda x: pd.read_csv(x, index_col=[0]))
l.append(
PublicPolicy('cite', data['start'], data['end'], insulation, 'subsidy_ad_valorem', gest='insulation',
cap=data['cap'], target=data.get('target'), year_stop=data.get('year_stop'),
years_stop=data.get('years_stop'))
)
return l
def read_zil(data):
l = list()
if isinstance(data['gest'], str):
data['gest'] = [data['gest']]
for gest in data['gest']:
l.append(PublicPolicy('zero_interest_loan', data['start'], data['end'], data['value'], 'zero_interest_loan',
cost_max=data.get('cost_max'), gest=gest, duration=data['duration'],
year_stop=data.get('year_stop'), years_stop=data.get('years_stop'),
public_cost=data.get('public_cost'), target=data.get('target')))
return l
def read_reduced_vat(data):
l = list()
l.append(PublicPolicy('reduced_vat', data['start'], data['end'], data['value'], 'reduced_vat', gest='heater',
social_housing=True, year_stop=data.get('year_stop'), years_stop=data.get('years_stop')))
l.append(
PublicPolicy('reduced_vat', data['start'], data['end'], data['value'], 'reduced_vat', gest='insulation',
social_housing=True, year_stop=data.get('year_stop'), years_stop=data.get('years_stop')))
return l
def read_ad_valorem(data):
l = list()
value = data['value']
if isinstance(value, str):
value = get_series(data['value'])
by = 'index'
if data.get('index') is not None:
mask = get_series(data['index'], header=0)
value *= mask
if data.get('columns') is not None:
mask = get_pandas(data['columns'], lambda x: pd.read_csv(x, index_col=[0]).squeeze()).rename(None)
if isinstance(value, (float, int)):
value *= mask
else:
value = value.to_frame().dot(mask.to_frame().T)
by = 'columns'
if data.get('growth'):
growth = get_series(data['growth'], header=None)
value = {k: i * value for k, i in growth.items()}
name = 'sub_ad_valorem'
if data.get('name') is not None:
name = data['name']
if isinstance(data['gest'], str):
data['gest'] = [data['gest']]
for gest in data['gest']:
l.append(PublicPolicy(name, data['start'], data['end'], value, 'subsidy_ad_valorem',
gest=gest, by=by, target=data.get('target'), cap=data.get('cap'),
year_stop=data.get('year_stop'), years_stop=data.get('years_stop')))
return l
def read_proportional(data):
l = list()
value = data.get('value')
if isinstance(value, str):
value = get_series(data['value'])
value = value.to_dict()
by = 'index'
if data.get('index') is not None:
mask = get_series(data['index'], header=0)
value *= mask
if data.get('columns') is not None:
mask = get_pandas(data['columns'], lambda x: pd.read_csv(x, index_col=[0]).squeeze()).rename(None)
if isinstance(value, (float, int)):
value *= mask
else:
value = value.to_frame().dot(mask.to_frame().T)
by = 'columns'
if data.get('growth'):
growth = get_series(data['growth'], header=None)
value = {k: i * value for k, i in growth.items()}
name = 'sub_proportional'
if data.get('name') is not None:
name = data['name']
proportional = 'MWh_cumac' # tCO2_cumac
if data.get('proportional') is not None:
proportional = data['proportional']
if isinstance(data['gest'], str):
data['gest'] = [data['gest']]
for gest in data['gest']:
l.append(PublicPolicy(name, data['start'], data['end'], value, 'subsidy_proportional',
gest=gest, by=by, target=data.get('target'), proportional=proportional,
year_stop=data.get('year_stop'), years_stop=data.get('years_stop')))
return l
def restriction_energy(data):
return [PublicPolicy(data['name'], data['start'], data['end'], data['value'],
'restriction_energy', gest='heater', target=data.get('target'),
variable=data.get('variable', True))]
def restriction_heater(data):
return [PublicPolicy(data['name'], data['start'], data['end'], data['value'],
'restriction_heater', gest='heater', target=data.get('target'),
variable=data.get('variable', True))]
def premature_heater(data):
return [PublicPolicy(data['name'], data['start'], data['end'], data['value'],
'premature_heater', gest='heater', target=data.get('target'))]
def read_obligation(data):
l = list()
banned_performance = get_pandas(data['value'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()).dropna()
start = min(banned_performance.index)
if data['start'] > start:
start = data['start']
frequency = data['frequency']
if frequency is not None:
frequency = pd.Series(frequency['value'], index=pd.Index(frequency['index'], name=frequency['name']))
target = None
if data.get('target') is not None:
target = pd.Series(True, index=pd.Index(data['target']['value'], name=data['target']['name']))
end = data['end']
if data.get('year_stop') is not None:
end = data['year_stop']
if data.get('years_stop') is not None:
end = data['years_stop'][0]
l.append(PublicPolicy(data['name'], start, end, banned_performance, 'obligation',
gest='insulation', frequency=frequency, intensive=data['intensive'],
min_performance=data['minimum_performance'], target=target))
if data.get('sub_obligation') is not None:
value = get_pandas(data['sub_obligation'], lambda x: pd.read_csv(x, index_col=[0]).squeeze())
l.append(PublicPolicy('sub_obligation', start, data['end'], value, 'subsidy_ad_valorem',
gest='insulation'))
return l
def read_regulation(data):
if isinstance(data['gest'], list):
l = list()
for gest in data['gest']:
l.append(PublicPolicy(data['name'], data['start'], data['end'], data.get('value'), data['policy'], gest=gest))
return l
else:
return [PublicPolicy(data['name'], data['start'], data['end'], data.get('value'), data['policy'], gest=data['gest'])]
def read_standard_policy(data):
l = list()
if isinstance(data['gest'], str):
gest = [data['gest']]
else:
gest = data['gest']
for g in gest:
l.append(PublicPolicy(data['name'], data['start'], data['end'], None, data['policy'],
gest=g))
return l
read = {'mpr': read_mpr,
'mpr_variant': read_mpr,
'mpr_efficacite': read_mpr,
'mpr_serenite': read_mpr_serenite,
'mpr_performance': read_mpr_serenite,
'mpr_serenite_variant': read_mpr_serenite,
'mpr_serenite_high_income': read_mpr_serenite,
'mpr_serenite_low_income': read_mpr_serenite,
'mpr_multifamily': read_mpr_serenite,
"mpr_multifamily_updated": read_mpr_serenite,
'mpr_multifamily_deep': read_mpr_serenite,
'mpr_serenite_multifamily_variant': read_mpr_serenite,
'cee': read_cee,
'cee_variant': read_cee,
'cee_2018': read_cee,
'cee_2021': read_cee,
'cee_2024': read_cee,
'cap': read_cap,
'cap_updated': read_cap,
'cap_variant': read_cee,
'carbon_tax': read_carbon_tax,
'carbon_tax_variant': read_carbon_tax,
'cite': read_cite,
'cite_insulation': read_cite,
'cite_heater': read_cite,
'reduced_vat': read_reduced_vat,
'reduced_vat_variant': read_reduced_vat,
'zero_interest_loan': read_zil}
list_policies = list()
for key, item in config['policies'].items():
item['name'] = key
if key in read.keys():
list_policies += read[key](item)
else:
if item.get('policy') == 'subsidy_ad_valorem':
list_policies += read_ad_valorem(item)
elif item.get('policy') == 'wco':
list_policies += read_cee(item)
elif item.get('policy') == 'subsidy_proportional':
list_policies += read_proportional(item)
elif item.get('policy') == 'zero_interest_loan':
list_policies += read_zil(item)
elif item.get('policy') == 'premature_heater':
list_policies += premature_heater(item)
elif item.get('policy') == 'restriction_energy':
list_policies += restriction_energy(item)
elif item.get('policy') == 'restriction_heater':
list_policies += restriction_heater(item)
elif item.get('policy') == 'obligation':
list_policies += read_obligation(item)
elif item.get('policy') == 'regulation':
list_policies += read_regulation(item)
elif item.get('policy') == 'credit_constraint':
list_policies += read_regulation(item)
elif item.get('policy') == 'carbon_tax':
list_policies += read_carbon_tax(item)
elif item.get('policy') == 'subsidy_cap':
list_policies += read_cap(item)
elif item.get('policy') in ['subsidy_present_bias', 'subsidy_multi_family', 'subsidy_landlord',
'tax_status_quo', 'subsidy_status_quo']:
list_policies += read_standard_policy(item)
else:
print('{} reading function is not implemented'.format(key))
policies_heater = [p for p in list_policies if p.gest == 'heater']
policies_insulation = [p for p in list_policies if p.gest == 'insulation']
taxes = [p for p in list_policies if p.policy == 'tax']
return policies_heater, policies_insulation, taxes
[docs]def read_inputs(config, other_inputs=generic_input):
"""Read all inputs in Python object and concatenate in one dict.
Parameters
----------
config: dict
Configuration dictionary with path to data.
other_inputs: dict
Other inputs that are manually inserted in param.py
Returns
-------
dict
"""
inputs = dict()
idx = range(config['start'], config['end'])
inputs.update(other_inputs)
if isinstance(config['energy']['energy_prices'], str):
energy_prices = get_pandas(config['macro']['energy_prices'], lambda x: pd.read_csv(x, index_col=[0]).rename_axis('Year').rename_axis('Energy', axis=1))
elif isinstance(config['energy']['energy_prices'], dict):
energy_prices = get_pandas(config['energy']['energy_prices']['ini'], lambda x: pd.read_csv(x, index_col=[0]).rename_axis('Year').rename_axis('Energy', axis=1))
energy_prices = energy_prices.loc[config['start'], :]
rate = Series(config['energy']['energy_prices']['rate']).rename_axis('Energy')
if config['energy']['energy_prices'].get('factor') is not None:
rate *= config['energy']['energy_prices']['factor']
temp = range(config['start'] + 1, 2051)
rate = concat([(1 + rate) ** n for n in range(len(temp))], axis=1, keys=temp)
rate = concat((pd.Series(1, index=rate.index, name=config['start']), rate), axis=1)
energy_prices = rate.T * energy_prices
if config['energy']['energy_prices'].get('shock') is not None:
temp = config['energy']['energy_prices']['shock']
energy_prices.loc[temp['start']:, :] *= temp['factor']
else:
raise NotImplemented
inputs.update({'energy_prices': energy_prices})
energy_taxes = get_pandas(config['energy']['energy_taxes'], lambda x: pd.read_csv(x, index_col=[0]).rename_axis('Year').rename_axis('Heating energy', axis=1))
inputs.update({'energy_taxes': energy_taxes})
inputs.update({'energy_vat': get_series(config['energy']['energy_vat'], header=None)})
inputs.update({'vat_heater': get_series(config['macro']['vat_heating_system'], header=[0])})
inputs.update({'cost_heater': get_series(config['technical']['cost_heater'], header=[0])})
inputs.update({'efficiency': get_series(config['technical']['efficiency'], header=[0])})
inputs.update({'temp_sink': get_series(config['technical']['temp_sink'], header=None)})
inputs.update({'lifetime_heater': get_series(config['technical']['lifetime_heater'], header=[0])})
inputs.update({'cost_insulation': get_series(config['technical']['cost_insulation'], header=[0])})
inputs.update({'frequency_insulation': config['renovation']['frequency_insulation']})
inputs.update({'performance_insulation_renovation': get_series(config['technical']['performance_insulation_renovation'], header=None).to_dict()})
inputs.update({'performance_insulation_construction': get_series(config['technical']['performance_insulation_construction'], header=None).to_dict()})
"""bill_saving_preferences = get_pandas(config['macro']['preferences_saving'],
lambda x: pd.read_csv(x, index_col=[0]))
preferences_insulation.update({'bill_saved': bill_saving_preferences.loc[:, 'Insulation']})
preferences_heater.update({'bill_saved': bill_saving_preferences.loc[:, 'Heater']})"""
preferences_insulation = get_series(config['renovation']['preferences_insulation'], header=None).to_dict()
preferences_insulation.update({'present_discount_rate': get_series(config['macro']['present_discount_rate'])})
preferences_heater = get_series(config['switch_heater']['preferences_heater'], header=None).to_dict()
preferences_heater.update({'present_discount_rate': get_series(config['macro']['present_discount_rate'])})
inputs.update({'preferences': {'insulation': preferences_insulation,
'heater': preferences_heater}})
consumption_ini = get_series(config['macro']['consumption_ini'], header=[0])
inputs.update({'consumption_ini': consumption_ini})
ms_heater = get_pandas(config['switch_heater']['ms_heater'], lambda x: pd.read_csv(x, index_col=[0, 1]))
ms_heater.columns.set_names('Heating system final', inplace=True)
calibration_heater = {
'ms_heater': ms_heater,
'scale': config['switch_heater']['scale']
}
inputs.update({'calibration_heater': calibration_heater})
if config['switch_heater'].get('district_heating') is not None:
district_heating = get_series(config['switch_heater']['district_heating'], header=None)
inputs.update({'flow_district_heating': district_heating.diff().fillna(0)})
calibration_renovation = None
if config['renovation']['endogenous']:
renovation_rate_ini = get_series(config['renovation']['renovation_rate_ini']).round(decimals=3)
scale_calibration = config['renovation']['scale']
ms_insulation_ini = get_pandas(config['renovation']['ms_insulation']['ms_insulation_ini'], lambda x: pd.read_csv(x, index_col=[0, 1, 2, 3]).squeeze().rename(None).round(decimals=3))
minimum_performance = config['renovation']['ms_insulation']['minimum_performance']
calibration_renovation = {'renovation_rate_ini': renovation_rate_ini,
'scale': scale_calibration,
'threshold_indicator': config['renovation'].get('threshold'),
'ms_insulation_ini': ms_insulation_ini,
'minimum_performance': minimum_performance}
inputs.update({'calibration_renovation': calibration_renovation})
if config['renovation'].get('exogenous_social'):
exogenous_social = get_pandas(config['renovation']['exogenous_social'],
lambda x: pd.read_csv(x, index_col=[0, 1]))
exogenous_social.columns = exogenous_social.columns.astype(int)
inputs.update({'exogenous_social': exogenous_social})
temp = None
if config['renovation'].get('rational_behavior') is not None:
if config['renovation']['rational_behavior']['activated']:
temp = {'calibration': config['renovation']['rational_behavior']['calibration'],
'social': config['renovation']['rational_behavior']['social'],
}
inputs.update({'rational_behavior_insulation': temp})
temp = None
if config['switch_heater'].get('rational_behavior') is not None:
if config['switch_heater']['rational_behavior']['activated']:
temp = {
'social': config['switch_heater']['rational_behavior']['social'],
}
inputs.update({'rational_behavior_heater': temp})
if config['macro'].get('population') is not None:
population = get_pandas(config['macro']['population'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze())
inputs.update({'population': population.loc[:config['end']]})
if config['macro'].get('stock_ini') is not None:
inputs.update({'stock_ini': config['macro']['stock_ini']})
if config['macro'].get('pop_housing') is None:
inputs.update({'pop_housing_min': other_inputs['pop_housing_min']})
inputs.update({'factor_pop_housing': other_inputs['factor_pop_housing']})
else:
pop_housing = get_pandas(config['macro']['pop_housing'],
lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze())
inputs.update({'pop_housing': pop_housing.loc[:config['end']]})
if config['macro'].get('share_single_family_construction') is not None:
temp = get_pandas(config['macro']['share_single_family_construction'],
lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze())
inputs.update({'share_single_family_construction': temp})
else:
if config['macro'].get('share_multi_family') is None:
inputs.update({'factor_multi_family': other_inputs['factor_multi_family']})
else:
_share_multi_family = get_pandas(config['macro']['share_multi_family'],
lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze())
inputs.update({'share_multi_family': _share_multi_family})
if config['macro']['available_income'] is not None:
inputs.update({'available_income': config['macro']['available_income']})
inputs.update({'income_rate': config['macro']['income_rate']})
income = get_series(config['macro']['income'], header=[0])
inputs.update({'income': income})
if isinstance(config['macro']['demolition_rate'], (float, int)):
demolition_rate = config['macro']['demolition_rate']
elif config['macro'].get('demolition_rate') is not None:
demolition_rate = get_series(config['macro']['demolition_rate'], header=None)
else:
demolition_rate = None
inputs.update({'demolition_rate': demolition_rate})
rotation_rate = get_pandas(config['macro']['rotation_rate'], lambda x: pd.read_csv(x, index_col=[0])).squeeze().rename(None)
inputs.update({'rotation_rate': rotation_rate})
surface = get_pandas(config['technical']['surface'], lambda x: pd.read_csv(x, index_col=[0, 1, 2]).squeeze().rename(None))
inputs.update({'surface': surface})
ratio_surface = get_pandas(config['technical']['ratio_surface'], lambda x: pd.read_csv(x, index_col=[0]))
inputs.update({'ratio_surface': ratio_surface})
if config['macro']['surface_built'] is None:
inputs.update({'surface_max': other_inputs['surface_max']})
inputs.update({'surface_elasticity': other_inputs['surface_elasticity']})
else:
surface_built = get_pandas(config['macro']['surface_built'], lambda x: pd.read_csv(x, index_col=[0]).squeeze().rename(None))
inputs.update({'surface_built': surface_built})
if config['macro'].get('flow_construction') is not None:
flow_construction = get_pandas(config['macro']['flow_construction'],
lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze())
inputs.update({'flow_construction': flow_construction})
temp = get_series(config['switch_heater']['ms_heater_built'], header=[0])
# Create a dictionary with the year as key and the share of each heating system as value
if 'Year' in temp.index.names:
# Interpolation over year using last known value
start = temp.index.get_level_values('Year').min()
ms_heater_built = temp.unstack('Year').reindex(range(start, config['end']), axis=1)
temp_idx = ms_heater_built.index.names
ms_heater_built = ms_heater_built.reset_index().interpolate(axis=1, method='pad').set_index(temp_idx)
# ms_heater_built = temp.unstack('Heating system').fillna(0)
inputs.update({'ms_heater_built': ms_heater_built.fillna(0)})
inputs.update({'health_cost_dpe': get_series(config['health_cost_dpe'])})
inputs.update({'health_cost_income': get_series(config['health_cost_income'])})
carbon_value = get_pandas(config['carbon_value'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze())
inputs.update({'carbon_value': carbon_value.loc[config['start']:]})
carbon_emission = get_pandas(config['energy']['carbon_emission'], lambda x: pd.read_csv(x, index_col=[0]).rename_axis('Year'))
inputs.update({'carbon_emission': carbon_emission.loc[config['start']:, :] * 1000})
renewable_gas = None
if isinstance(config['energy']['renewable_gas'], str):
renewable_gas = get_series(config['energy']['renewable_gas'], header=None)
renewable_gas = renewable_gas.loc[config['start']:]
# set carbon emission of natural gas constant
inputs['carbon_emission'].loc[:, 'Natural gas'] = inputs['carbon_emission'].loc[config['start'], 'Natural gas']
inputs.update({'renewable_gas': renewable_gas})
footprint_built = get_series(config['technical']['footprint_construction'], header=None)
inputs.update({'footprint_built': footprint_built})
footprint_renovation = get_pandas(config['technical']['footprint_renovation'], lambda x: pd.read_csv(x, index_col=[0]))
inputs.update({'footprint_renovation': footprint_renovation})
if config['macro'].get('use_subsidies'):
temp = get_pandas(config['macro']['use_subsidies'], lambda x: pd.read_csv(x, index_col=[0]).squeeze())
inputs.update({'use_subsidies': temp})
else:
inputs.update({'use_subsidies': pd.Series(dtype=float)})
if 'hourly_profile' in config['technical'].keys():
temp = get_series(config['technical']['hourly_profile'], header=None)
temp.index = pd.TimedeltaIndex(range(0, 24), unit='h')
inputs.update({'hourly_profile': temp})
inputs['input_financing'].update(config['financing_cost'])
if inputs['input_financing']['activated'] is False:
temp_idx = get_series(inputs['input_financing']['upfront_max']).index
inputs['input_financing']['upfront_max'] = pd.Series(100000, index=temp_idx)
inputs['input_financing']['saving_rate'] = pd.Series(0, index=idx)
inputs['input_financing']['interest_rate'] = pd.Series(0.1, index=idx)
else:
inputs['input_financing']['upfront_max'] = get_series(inputs['input_financing']['upfront_max'])
inputs['input_financing']['saving_rate'] = get_series(inputs['input_financing']['saving_rate'], header=None)
inputs['input_financing']['interest_rate'] = get_series(inputs['input_financing']['interest_rate'], header=None)
return inputs
[docs]def parse_inputs(inputs, taxes, config, stock):
"""Macro module : run exogenous dynamic parameters.
Parameters
----------
inputs: dict
Raw inputs read as Python object.
taxes: list
config: dict
Configuration file.
stock: Series
Building stock.
Returns
-------
dict
Parsed input
"""
idx = range(config['start'], config['end'])
parsed_inputs = copy.deepcopy(inputs)
if config['technical'].get('technical_progress') is not None:
parsed_inputs['technical_progress'] = dict()
if 'insulation' in config['technical']['technical_progress'].keys():
if config['technical']['technical_progress']['insulation']['activated']:
value = config['technical']['technical_progress']['insulation']['value_end']
start = config['technical']['technical_progress']['insulation']['start']
end = config['technical']['technical_progress']['insulation']['end']
value = round((1 + value) ** (1 / (end - start + 1)) - 1, 5)
parsed_inputs['technical_progress']['insulation'] = Series(value, index=range(start, end + 1)).reindex(idx).fillna(0)
if 'heater' in config['technical']['technical_progress'].keys():
if config['technical']['technical_progress']['heater']['activated']:
value = config['technical']['technical_progress']['heater']['value_end']
start = config['technical']['technical_progress']['heater']['start']
end = config['technical']['technical_progress']['heater']['end']
value = round((1 + value) ** (1 / (end - start + 1)) - 1, 5)
parsed_inputs['technical_progress']['heater'] = Series(value, index=range(start, end + 1)).reindex(idx).fillna(0)
if 'Year' in inputs['efficiency'].index.names:
s = inputs['efficiency'].index.get_level_values('Year').min()
df = inputs['efficiency'].copy()
df = df.unstack('Heating system').reindex(range(s, config['end'])).fillna(method='ffill')
parsed_inputs['efficiency'] = df.T
s = inputs['temp_sink'].index.min()
parsed_inputs['temp_sink'] = inputs['temp_sink'].reindex(range(s, config['end'])).fillna(method='ffill')
if isinstance(inputs['demolition_rate'], (float, int)):
parsed_inputs['demolition_rate'] = pd.Series(inputs['demolition_rate'], index=idx[1:])
elif isinstance(inputs['demolition_rate'], Series):
s = inputs['demolition_rate'].index.min()
parsed_inputs['demolition_rate'] = inputs['demolition_rate'].reindex(range(s, config['end'])).fillna(method='ffill')
parsed_inputs['demolition_rate'] = parsed_inputs['demolition_rate'].loc[idx[1:]]
if parsed_inputs['demolition_rate'] is None:
parsed_inputs['flow_demolition'] = 0
else:
parsed_inputs['flow_demolition'] = parsed_inputs['demolition_rate'] * stock.sum()
if 'flow_construction' not in parsed_inputs.keys():
if 'population' in inputs.keys():
parsed_inputs['population_total'] = inputs['population']
parsed_inputs['sizing_factor'] = stock.sum() / inputs['stock_ini']
parsed_inputs['population'] = inputs['population'] * parsed_inputs['sizing_factor']
if 'pop_housing' in parsed_inputs.keys():
parsed_inputs['stock_need'] = parsed_inputs['population'] / parsed_inputs['pop_housing']
else:
parsed_inputs['stock_need'], parsed_inputs['pop_housing'] = stock_need(parsed_inputs['population'],
parsed_inputs['population'][
config['start']] / stock.sum(),
inputs['pop_housing_min'],
config['start'],
inputs['factor_pop_housing'])
parsed_inputs['flow_need'] = parsed_inputs['stock_need'] - parsed_inputs['stock_need'].shift(1)
# if 'flow_construction' not in parsed_inputs.keys():
parsed_inputs['flow_construction'] = parsed_inputs['flow_need'] + parsed_inputs['flow_demolition']
parsed_inputs['available_income'] = pd.Series(
[inputs['available_income'] * (1 + config['macro']['income_rate']) ** (i - idx[0]) for i in idx], index=idx)
else:
s = inputs['flow_construction'].index.min()
parsed_inputs['flow_construction'] = inputs['flow_construction'].reindex(range(s, config['end'])).fillna(method='ffill')
parsed_inputs['flow_construction'] = parsed_inputs['flow_construction'].loc[idx]
parsed_inputs['surface'] = pd.concat([parsed_inputs['surface']] * len(idx), axis=1, keys=idx)
if 'share_single_family_construction' in inputs.keys():
s = inputs['share_single_family_construction'].index.min()
parsed_inputs['share_single_family_construction'] = inputs['share_single_family_construction'].reindex(range(s, config['end'])).fillna(method='ffill')
parsed_inputs['share_single_family_construction'] = parsed_inputs['share_single_family_construction'].loc[idx]
temp = pd.concat((parsed_inputs['share_single_family_construction'], (1 - parsed_inputs['share_single_family_construction'])),
axis=1, keys=['Single-family', 'Multi-family'], names=['Housing type'])
type_built = parsed_inputs['flow_construction'] * temp.T
else:
if 'share_multi_family' not in inputs.keys():
parsed_inputs['share_multi_family'] = share_multi_family(parsed_inputs['stock_need'],
inputs['factor_multi_family'])
type_built = share_type_built(parsed_inputs['stock_need'], parsed_inputs['share_multi_family'],
parsed_inputs['flow_construction']) * parsed_inputs['flow_construction']
# multiply by the rate of income from start to end
income = parsed_inputs['income'].copy()
income = pd.concat([income] * len(idx), axis=1, keys=idx, names=['Year'])
rate = pd.Series([(1 + parsed_inputs['income_rate']) ** (i - idx[0]) for i in idx], index=idx)
parsed_inputs['income'] = income * rate
share_decision_maker = stock.groupby(
['Occupancy status', 'Housing type', 'Income owner', 'Income tenant']).sum().unstack(
['Occupancy status', 'Income owner', 'Income tenant'])
share_decision_maker = (share_decision_maker.T / share_decision_maker.sum(axis=1)).T
share_decision_maker = pd.concat([share_decision_maker] * type_built.shape[1], keys=type_built.columns, axis=1)
construction = (reindex_mi(type_built, share_decision_maker.columns, axis=1) * share_decision_maker).stack(
['Occupancy status', 'Income owner', 'Income tenant']).fillna(0)
construction.rename_axis('Year', axis=1, inplace=True)
construction = construction.loc[:, idx]
construction = construction.stack()
ms_heater_built = inputs['ms_heater_built'].stack('Year').unstack('Heating system').fillna(0)
restriction = [k for k, p in config['policies'].items() if p.get('policy') in ['restriction_energy', 'restriction_heater']]
for policy in restriction:
if config['policies'][policy]['policy'] == 'restriction_energy':
heating_system = [i for i, energy in resources_data['heating2heater'].items() if energy == config['policies'][policy]['value']]
else:
heating_system = config['policies'][policy]['value']
heating_system = [i for i in heating_system if i in ms_heater_built.columns]
start_policy = config['policies'][policy]['start']
ms_heater_built.loc[ms_heater_built.index.get_level_values('Year') >= start_policy, heating_system] = 0
ms_heater_built = (ms_heater_built.T / ms_heater_built.sum(axis=1)).T
ms_heater_built = reindex_mi(ms_heater_built, construction.index)
temp = construction.copy()
construction = (reindex_mi(construction, ms_heater_built.index) * ms_heater_built.T).T
construction = construction.loc[(construction != 0).any(axis=1)]
construction = construction.stack('Heating system').unstack('Year')
assert round(temp.sum() - construction.sum().sum(), 0) == 0, 'Construction is not equal to the sum of the heating system'
construction_dh = select(construction, {'Heating system': 'Heating-District heating'}).sum()
parsed_inputs['flow_district_heating'] = parsed_inputs['flow_district_heating'] - construction_dh
parsed_inputs['flow_district_heating'][parsed_inputs['flow_district_heating'] < 0] = 0
performance_insulation = pd.concat([pd.Series(inputs['performance_insulation_construction'])] * construction.shape[0], axis=1,
keys=construction.index).T
parsed_inputs['flow_built'] = pd.concat((construction, performance_insulation), axis=1).set_index(
list(performance_insulation.keys()), append=True)
parsed_inputs['flow_built'] = pd.concat([parsed_inputs['flow_built']], keys=[False],
names=['Existing']).reorder_levels(stock.index.names)
if not config['macro']['construction']:
parsed_inputs['flow_built'][parsed_inputs['flow_built'] > 0] = 0
"""
parsed_inputs['health_expenditure'] = df['Health expenditure']
parsed_inputs['mortality_cost'] = df['Social cost of mortality']
parsed_inputs['loss_well_being'] = df['Loss of well-being']"""
parsed_inputs['carbon_value_kwh'] = (parsed_inputs['carbon_value'] * parsed_inputs['carbon_emission'].T).T.dropna() / 10**6
parsed_inputs['embodied_energy_built'] = inputs['footprint_built'].loc['Grey energy (kWh/m2)']
parsed_inputs['carbon_footprint_built'] = inputs['footprint_built'].loc['Carbon content (kgCO2/m2)']
# carbon footprint of renovation
parsed_inputs['embodied_energy_renovation'] = inputs['footprint_renovation'].loc['Grey energy (kWh/m2)', :]
parsed_inputs['carbon_footprint_renovation'] = inputs['footprint_renovation'].loc['Carbon content (kgCO2/m2)', :]
temp = parsed_inputs['surface'].xs(False, level='Existing', drop_level=True)
temp = (parsed_inputs['flow_built'].groupby(temp.index.names).sum() * temp).sum() / 10**6
parsed_inputs['Surface construction (Million m2)'] = temp
parsed_inputs['Carbon footprint construction (MtCO2)'] = (parsed_inputs['Surface construction (Million m2)'] * parsed_inputs['carbon_footprint_built']) / 10**3
parsed_inputs['Embodied energy construction (TWh PE)'] = (parsed_inputs['Surface construction (Million m2)'] * parsed_inputs['embodied_energy_built']) / 10**3
energy_prices = parsed_inputs['energy_prices'].copy()
parsed_inputs['energy_prices_wt'] = energy_prices.copy()
energy_taxes = parsed_inputs['energy_taxes'].copy()
if config['simple']['prices_constant']:
energy_prices = pd.concat([energy_prices.loc[config['start'], :]] * energy_prices.shape[0], keys=energy_prices.index,
axis=1).T
total_taxes = pd.DataFrame(0, index=energy_prices.index, columns=energy_prices.columns)
export_prices = dict()
export_prices.update({'energy_prices': energy_prices})
for t in taxes:
total_taxes = total_taxes.add(t.value, fill_value=0)
export_prices.update({t.name: t.value})
if energy_taxes is not None:
total_taxes = total_taxes.add(energy_taxes, fill_value=0)
taxes += [PublicPolicy('energy_taxes', energy_taxes.index[0], energy_taxes.index[-1], energy_taxes, 'tax')]
export_prices.update({'energy_taxes': energy_taxes})
if config['simple']['prices_constant']:
total_taxes = pd.concat([total_taxes.loc[config['start'], :]] * total_taxes.shape[0], keys=total_taxes.index,
axis=1).T
# energy_vat = energy_prices * (inputs['energy_vat'] / (1 - inputs['energy_vat']))
energy_vat = energy_prices * inputs['energy_vat']
export_prices.update({'energy_vat': energy_vat})
taxes += [PublicPolicy('energy_vat', energy_vat.index[0], energy_vat.index[-1], energy_vat, 'tax')]
total_taxes += energy_vat
parsed_inputs['taxes'] = taxes
parsed_inputs['total_taxes'] = total_taxes
energy_prices = energy_prices.add(total_taxes, fill_value=0)
parsed_inputs['energy_prices'] = energy_prices
export_prices = reverse_dict({k: item.to_dict() for k, item in export_prices.items()})
export_prices = concat([pd.DataFrame(item) for k, item in export_prices.items()], axis=1, keys=export_prices.keys())
parsed_inputs.update({'export_prices': export_prices})
supply = {'insulation': None, 'heater': None}
if config.get('supply') is not None:
if config['supply']['activated_insulation']:
supply.update({'insulation': {'markup_insulation': config['supply']['markup_insulation']}})
if config['supply']['activated_heater']:
supply.update({'heater': {'markup_heater': config['supply']['markup_heater']}})
parsed_inputs.update({'supply': supply})
premature_replacement = None
if config['switch_heater'].get('premature_replacement') is not None:
premature_replacement = {'time': config['switch_heater']['premature_replacement'],
'information_rate': config['switch_heater']['information_rate']}
parsed_inputs.update({'premature_replacement': premature_replacement})
return parsed_inputs
[docs]def dump_inputs(parsed_inputs, path, figures=None):
"""Create summary input DataFrame.
Parameters
----------
parsed_inputs: dict
Returns
-------
DataFrame
"""
summary_input = dict()
if 'sizing_factor' in parsed_inputs.keys():
summary_input['Sizing factor (%)'] = pd.Series(parsed_inputs['sizing_factor'], index=parsed_inputs['population'].index)
summary_input['Total population (Millions)'] = parsed_inputs['population'] / 10**6
summary_input['Income (Billions euro)'] = parsed_inputs['available_income'] * parsed_inputs['sizing_factor'] / 10**9
summary_input['Buildings stock (Millions)'] = parsed_inputs['stock_need'] / 10**6
summary_input['Person by housing'] = parsed_inputs['pop_housing']
summary_input['Buildings additional (Thousands)'] = parsed_inputs['flow_need'] / 10**3
summary_input['Buildings built (Thousands)'] = parsed_inputs['flow_construction'] / 10**3
summary_input['Buildings demolished (Thousands)'] = parsed_inputs['flow_demolition'] / 10**3
temp = parsed_inputs['surface'].xs(True, level='Existing', drop_level=True)
temp.index = temp.index.map(lambda x: 'Surface existing {} - {} (m2/dwelling)'.format(x[0], x[1]))
summary_input.update(temp.T)
temp = parsed_inputs['surface'].xs(False, level='Existing', drop_level=True)
temp.index = temp.index.map(lambda x: 'Surface construction {} - {} (m2/dwelling)'.format(x[0], x[1]))
summary_input.update(temp.T)
summary_input['Surface construction (Million m2)'] = parsed_inputs['Surface construction (Million m2)']
summary_input['Carbon footprint construction (MtCO2)'] = parsed_inputs['Carbon footprint construction (MtCO2)']
summary_input['Embodied energy construction (TWh PE)'] = parsed_inputs['Embodied energy construction (TWh PE)']
summary_input = pd.DataFrame(summary_input)
t = parsed_inputs['total_taxes'].copy()
t.columns = t.columns.map(lambda x: 'Taxes {} (euro/kWh)'.format(x))
temp = parsed_inputs['energy_prices'].copy()
if figures is not False:
make_plot(temp.dropna(), 'Prices (euro/kWh)', format_y=lambda y, _: '{:.2f}'.format(y),
colors=resources_data['colors'], save=os.path.join(path, 'energy_prices.png'))
temp.columns = temp.columns.map(lambda x: 'Prices {} (euro/kWh)'.format(x))
summary_input = pd.concat((summary_input, t, temp), axis=1)
temp = parsed_inputs['income'].copy()
temp.index = temp.index.map(lambda x: 'Income {} (euro/year)'.format(x))
summary_input = pd.concat((summary_input, temp.T), axis=1)
summary_input.T.round(3).to_csv(os.path.join(path, 'input.csv'))
parsed_inputs['export_prices'].round(4).to_csv(os.path.join(path, 'energy_prices.csv'))
return summary_input
[docs]def dict2data_inputs(inputs):
"""Grouped all inputs in the same DataFrame.
Process is useful to implement a global sensitivity analysis.
Returns
-------
DataFrame
"""
data = DataFrame(columns=['variables', 'index', 'value'])
metadata = DataFrame(columns=['variables', 'type', 'name', 'index', 'columns'])
for key, item in inputs.items():
i = True
if isinstance(item, dict):
metadata = concat((metadata.T, Series({'variables': key, 'type': type(item).__name__})), axis=1).T
i = False
item = Series(item)
if isinstance(item, (float, int)):
data = concat((data.T, Series({'variables': key, 'value': item})), axis=1).T
metadata = concat((metadata.T, Series({'variables': key, 'type': type(item).__name__})), axis=1).T
if isinstance(item, DataFrame):
metadata = concat((metadata.T, Series({'variables': key, 'type': type(item).__name__,
'index': item.index.names.copy(),
'columns': item.columns.names.copy()})), axis=1).T
i = False
item = item.stack(item.columns.names)
if isinstance(item, Series):
if i:
metadata = concat((metadata.T, Series({'variables': key, 'type': type(item).__name__, 'name': item.name,
'index': item.index.names.copy()})), axis=1).T
if isinstance(item.index, MultiIndex):
item.index = item.index.to_flat_index()
item.index = item.index.rename('index')
df = concat([item.rename('value').reset_index()], keys=[key], names=['variables']).reset_index('variables')
data = concat((data, df), axis=0)
data = data.astype({'variables': 'string', 'value': 'float64'})
data.reset_index(drop=True, inplace=True)
return data
[docs]def data2dict_inputs(data, metadata):
"""Parse aggregate data pandas and return dict fill with several inputs.
Parameters
----------
data: DataFrame
Model data input.
metadata: DataFrame
Additional information to find out how to parse data.
Returns
-------
dict
"""
def parse_index(n, index_values):
if len(n) == 1:
idx = Index(index_values, name=n[0])
else:
idx = MultiIndex.from_tuples(index_values)
idx.names = n
return idx
parsed_input = dict()
for variables, df in data.groupby('variables'):
meta = metadata[metadata['variables'] == variables]
if meta['type'].iloc[0] == 'int':
parsed_input.update({variables: int(df['value'].iloc[0])})
elif meta['type'].iloc[0] == 'float':
parsed_input.update({variables: float(df['value'].iloc[0])})
elif meta['type'].iloc[0] == 'Series':
idx = parse_index(meta['index'].iloc[0], df['index'].values)
parsed_input.update({variables: Series(df['value'].values, name=str(meta['name'].iloc[0]), index=idx)})
elif meta['type'].iloc[0] == 'DataFrame':
idx = parse_index(meta['index'].iloc[0] + meta['columns'].iloc[0], df['index'].values)
parsed_input.update({variables: Series(df['value'].values, name=str(meta['name'].iloc[0]), index=idx).unstack(
meta['columns'].iloc[0])})
elif meta['type'].iloc[0] == 'dict':
parsed_input.update({variables: Series(df['value'].values, index=df['index'].values).to_dict()})
return parsed_input
[docs]def create_simple_policy(start, end, value=0.3, gest='insulation'):
return PublicPolicy('sub_ad_valorem', start, end, value, 'subsidy_ad_valorem',
gest=gest)