Source code for project.read_input

# Copyright 2020-2021 Ecole Nationale des Ponts et Chaussées
#
# This file is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
# Original author Lucas Vivier <vivier@centre-cired.fr>

import copy
import os
import pandas as pd
from pandas import Series, DataFrame, concat, MultiIndex, Index
from numpy.random import normal
from numpy.testing import assert_almost_equal

from project.utils import reindex_mi, get_pandas, make_plot, get_series, reverse_dict, select
from project.dynamic import stock_need, share_multi_family, share_type_built
from project.input.param import generic_input
from project.input.resources import resources_data


[docs]class PublicPolicy: """Public policy parent class. Attributes ---------- name : str Name of the policy. start : int Year policy starts. end : int Year policy ends. value: float policy : {'energy_taxes', 'subsidies'} """ def __init__(self, name, start, end, value, policy, gest=None, cap=None, target=None, cost_min=None, cost_max=None, new=None, by='index', non_cumulative=None, frequency=None, intensive=None, min_performance=None, bonus=False, social_housing=True, duration=None, recycling=None, proportional=None, recycling_ini=None, year_stop=None, years_stop=None, public_cost=None, variable=True): self.name = name self.start = start self.end = end self.value = value self.policy = policy self.gest = gest self.cap = cap self.target = target self.cost_max = cost_max self.cost_min = cost_min self.new = new self.by = by self.non_cumulative = non_cumulative self.frequency = frequency self.intensive = intensive self.min_performance = min_performance self.bonus = bonus self.social_housing = social_housing self.duration = duration self.recycling = recycling self.recycling_ini = recycling_ini self.proportional = proportional self.public_cost = public_cost self.year_stop = year_stop self.years_stop = years_stop self.variable = variable self.incentive = False if policy in ['subsidy_ad_valorem', 'subsidy_target', 'subsidy_proportional', 'bonus', 'reduced_vat', 'zero_interest_loan']: self.incentive = True if (year_stop or years_stop) and self.incentive: if not isinstance(value, dict): self.value = {k: value for k in range(self.start, self.end)} if year_stop: if self.incentive: if year_stop < end: self.apply_year_stop(year_stop, self.value) else: self.end = min(year_stop, end) if years_stop: if self.incentive: for y in years_stop: if y < end: self.apply_year_stop(y, self.value) else: self.end = min(years_stop[0], end) def __repr__(self): return self.name def __str__(self): return self.name
[docs] def cost_targeted(self, cost_insulation, target_subsidies=None): """ Gives the amount of the cost of a gesture for a segment over which the subvention applies. If self.new, cost global is the amount loaned for gestures which are considered as 'global renovations', and thus caped by the proper maximum zil amount taking the heater replacement into account. Also, cost_no_global are the amount loaned for unique or bunch renovations actions. Parameters ---------- cost_insulation: pd.DataFrame Cost of an insulation gesture target_subsidies: pd.DataFrame Boolean values. If self.new it corresponds to the global renovations Returns ------- cost: pd.DataFrame Each cell of the DataFrame corresponds to the cost after subventions of a specific gesture and segment """ cost = cost_insulation.copy() if self.target is not None and target_subsidies is not None: if isinstance(target_subsidies, Series): target_subsidies = pd.concat([target_subsidies] * cost.shape[1], axis=1, keys=cost.columns) cost = cost[target_subsidies.astype(bool)].fillna(0) if self.cost_max is not None: cost_max = reindex_mi(self.cost_max, cost.index) cost_max = pd.concat([cost_max] * cost.shape[1], axis=1).set_axis(cost.columns, axis=1) cost[cost > cost_max] = cost_max if self.cost_min is not None: cost_min = reindex_mi(self.cost_min, cost.index) cost_min = pd.concat([cost_min] * cost.shape[1], axis=1).set_axis( cost.columns, axis=1) cost[cost < cost_min] = 0 return cost
[docs] def apply_year_stop(self, year_stop, value): """Put values of an incentive to 1e-4 to assess the impact of removing the incentive. Rationale is to keep calculating the number of eligible households that renovate without the incentive. Parameters ---------- year_stop value Returns ------- """ if self.policy == 'zero_interest_loan': if not isinstance(self.cost_max, dict): self.cost_max = {k: self.cost_max for k in range(self.start, self.end)} self.cost_max[year_stop] = 1e-4 else: if self.policy == 'reduced_vat': val = 0.1 - 1e-4 else: val = 1e-4 if isinstance(value[year_stop], (pd.Series, pd.DataFrame)): temp = value[year_stop].mask(value[year_stop] > 0, val) else: temp = val value[year_stop] = temp self.value = value
[docs]def read_stock(config): """Read initial building stock. Parameters ---------- config: dict Returns ------- pd.Series MultiIndex Series with building stock attributes as levels. """ stock = get_pandas(config['building_stock'], lambda x: pd.read_csv(x, index_col=[0, 1, 2, 3, 4, 5, 6, 7, 8]).squeeze()).rename('Stock buildings') stock_sum = stock.sum() stock = stock.reset_index('Heating system') # replace wood fuel multi-family by natural gas in building stock idx = (stock['Heating system'] == 'Wood fuel-Standard boiler') & (stock.index.get_level_values('Housing type') == 'Multi-family') stock.loc[idx, 'Heating system'] = 'Natural gas-Standard boiler' idx = (stock['Heating system'] == 'Wood fuel-Performance boiler') & (stock.index.get_level_values('Housing type') == 'Multi-family') stock.loc[idx, 'Heating system'] = 'Natural gas-Performance boiler' assert_almost_equal(stock['Stock buildings'].sum(), stock_sum) # specify heat-pump repartition = 0.8 idx = stock['Heating system'] == 'Electricity-Heat pump' hp_water = stock.loc[idx, :].copy() hp_water['Stock buildings'] *= repartition hp_water['Heating system'] = hp_water['Heating system'].str.replace('Electricity-Heat pump', 'Electricity-Heat pump water') hp_air = stock.loc[idx, :].copy() hp_air['Stock buildings'] *= (1 - repartition) hp_air['Heating system'] = hp_air['Heating system'].str.replace('Electricity-Heat pump', 'Electricity-Heat pump air') stock = stock.loc[~idx, :] stock = pd.concat((stock, hp_water, hp_air), axis=0) if config['simple'].get('collective_boiler'): multi_family = stock.index.get_level_values('Housing type') == 'Multi-family' oil_fuel = stock['Heating system'].isin(['Oil fuel-Performance boiler', 'Oil fuel-Standard boiler']) stock.loc[multi_family & oil_fuel, 'Heating system'] = 'Oil fuel-Collective boiler' repartition = 0.5 idx_gas = stock['Heating system'].isin(['Natural gas-Performance boiler', 'Natural gas-Standard boiler']) & multi_family collective_gas = stock.loc[idx_gas, :].copy() collective_gas['Stock buildings'] *= repartition collective_gas['Heating system'] = collective_gas['Heating system'].str.replace('Natural gas-Performance boiler', 'Natural gas-Collective boiler') collective_gas['Heating system'] = collective_gas['Heating system'].str.replace('Natural gas-Standard boiler', 'Natural gas-Collective boiler') individual_gas = stock.loc[idx_gas, :].copy() individual_gas['Stock buildings'] *= (1 - repartition) stock = stock.loc[~idx_gas, :] stock = pd.concat((stock, collective_gas, individual_gas), axis=0) stock = stock.set_index('Heating system', append=True).squeeze() stock = stock.groupby(stock.index.names).sum() stock = pd.concat([stock], keys=[True], names=['Existing']) idx_names = ['Existing', 'Occupancy status', 'Income owner', 'Income tenant', 'Housing type', 'Heating system', 'Wall', 'Floor', 'Roof', 'Windows'] stock = stock.reorder_levels(idx_names) assert_almost_equal(stock.sum(), stock_sum) return stock
[docs]def read_policies(config): # TODO: replace names to clarify the definition of policies (index = households, columns = technologies). # TODO: target should be a list with combination of simple condition. def read_mpr(data): l = list() heater = get_series(data['heater']).unstack('Heating system final') if 'Year' in heater.index.names: heater = {y: heater.loc[heater.index.get_level_values('Year') == y, :].droplevel('Year') for y in heater.index.get_level_values('Year').unique()} elif data.get('growth_heater'): growth_heater = get_pandas(data['growth_heater'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()) heater = {k: i * heater for k, i in growth_heater.items()} insulation = get_pandas(data['insulation'], lambda x: pd.read_csv(x, index_col=[0, 1])) if data.get('growth_insulation'): growth_insulation = get_pandas(data['growth_insulation'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()) insulation = {k: i * insulation for k, i in growth_insulation.items()} if data.get('deep_renovation'): deep_renovation = get_pandas(data['deep_renovation'], lambda x: pd.read_csv(x, index_col=[0]).squeeze()) l.append(PublicPolicy(data['name'], data['start'], data['end'], deep_renovation, 'subsidy_target', gest='insulation', target='deep_renovation', year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) if data['bonus']: bonus_best = get_pandas(data['bonus'], lambda x: pd.read_csv(x, index_col=[0, 1]).squeeze()) bonus_worst = get_pandas(data['bonus'], lambda x: pd.read_csv(x, index_col=[0, 1]).squeeze()) l.append(PublicPolicy(data['name'], data['start'], data['end'], bonus_best, 'bonus', gest='insulation', target='reach_best', year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) l.append(PublicPolicy(data['name'], data['start'], data['end'], bonus_worst, 'bonus', gest='insulation', target='out_worst', year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) if data['heater'] is not None: l.append(PublicPolicy(data['name'], data['start'], data['end'], heater, 'subsidy_target', gest='heater', year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) if data['insulation'] is not None: l.append(PublicPolicy(data['name'], data['start'], data['end'], insulation, 'subsidy_target', gest='insulation', target=data.get('target'), year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) return l def read_mpr_serenite(data): """Create MPR Serenite PublicPolicy instance. MaPrimeRénov' Sérénité (formerly Habiter Mieux Sérénité) for major energy renovation work in your home. To do so, your work must result in an energy gain of at least 35%. The amount of the bonus varies according to the amount of your resources. Parameters ---------- data Returns ------- list """ l = list() value = get_series(data['insulation']) cap = None if data.get('cap') is not None: cap = get_series(data['cap']) if data.get('growth_insulation'): growth_insulation = get_series(data['growth_insulation']) value = {k: i * value for k, i in growth_insulation.items()} cap = {k: i * cap for k, i in growth_insulation.items()} l.append(PublicPolicy(data['name'], data['start'], data['end'], value, data['policy'], target=data.get('target'), gest='insulation', non_cumulative=data.get('non_cumulative'), cap=cap, year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) if data.get('bonus') is not None: bonus = get_series(data['bonus']) l.append(PublicPolicy(data['name'], data['start'], data['end'], bonus, 'bonus', gest='insulation', year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) return l def read_cee(data): l = list() if isinstance(data['value'], str): cee_value = get_series(data['value']) elif isinstance(data['value'], (int, float)): cee_value = pd.Series(data['value'], index=range(data['start'], data['end'] + 1)) else: raise NotImplemented if data['cumac_heater'] is not None: cumac_heater = get_series(data['cumac_heater']) cee_heater = cumac_heater * cee_value / 1000 cee_heater = cee_heater.unstack('Heating system final') cee_heater = {y: cee_heater.loc[cee_heater.index.get_level_values('Year') == y, :].droplevel('Year') for y in cee_heater.index.get_level_values('Year').unique()} l.append(PublicPolicy('cee', data['start'], data['end'], cee_heater, 'subsidy_target', gest='heater', social_housing=True, year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) bonus_heater = get_series(data['bonus_heater']['value']).unstack('Heating system final') bonus_heater = bonus_heater.reindex(cee_heater[data['start']].columns, axis=1).fillna(0) if 'Year' in bonus_heater.index.names: bonus_heater = {y: bonus_heater.loc[bonus_heater.index.get_level_values('Year') == y, :].droplevel('Year') for y in bonus_heater.index.get_level_values('Year').unique()} end = min(data['bonus_heater']['end'], data['end']) l.append(PublicPolicy('cee', data['bonus_heater']['start'], end, bonus_heater, 'bonus', gest='heater', social_housing=True, year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) if data['cumac_insulation'] is not None: cumac_insulation = get_series(data['cumac_insulation']) cee_insulation = cumac_insulation * cee_value / 1000 cee_insulation = cee_insulation.unstack('Insulation').rename_axis(None, axis=1) cee_insulation = {y: cee_insulation.loc[cee_insulation.index.get_level_values('Year') == y, :].squeeze() for y in cee_insulation.index.get_level_values('Year').unique()} bonus_insulation = get_pandas(data['bonus_insulation']['value'], lambda x: pd.read_csv(x, index_col=[0])) end = min(data['bonus_heater']['end'], data['end']) l.append(PublicPolicy('cee', data['bonus_insulation']['start'], end, bonus_insulation, 'bonus', gest='insulation', social_housing=True, year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) l.append(PublicPolicy('cee', data['start'], data['end'], cee_insulation, 'subsidy_target', gest='insulation', social_housing=True, year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) coefficient_obligation = get_pandas(data['coefficient_obligation'], lambda x: pd.read_csv(x, index_col=[0])).rename_axis('Energy', axis=1) cee_tax = (coefficient_obligation.T * cee_value).T / 1000 end = data['end'] if data.get('year_stop') is not None: end = data['year_stop'] cee_tax = cee_tax.loc[data['start']:end - 1, :] if data.get('years_stop') is not None: if data['years_stop']: cee_tax.loc[data['years_stop'], :] = 0 l.append(PublicPolicy('cee', data['start'], end, cee_tax, 'tax')) return l def read_cap(data): l = list() if 'insulation' in data.keys(): l.append(PublicPolicy('subsidies_cap', data['start'], data['end'], get_series(data['insulation']), 'subsidies_cap', gest='insulation', target=data.get('target_insulation'))) if 'heater' in data.keys(): l.append(PublicPolicy('subsidies_cap', data['start'], data['end'], get_series(data['heater']), 'subsidies_cap', gest='heater', target=data.get('target_heater'))) return l def read_carbon_tax(data): tax = get_series(data['tax'], header=None) emission = get_pandas(data['emission'], lambda x: pd.read_csv(x, index_col=[0]).squeeze()) tax = (tax * emission.T).T.fillna(0) / 10 ** 6 tax = tax.loc[(tax != 0).any(axis=1)] recycling = None if data.get('recycling') is not None: if isinstance(data['recycling'], str): recycling = get_series(data['recycling'], header=0) else: recycling = data['recycling'] end = data['end'] if data.get('year_stop') is not None: end = data['year_stop'] tax = tax.loc[data['start']:end - 1, :] if data.get('years_stop') is not None: if data['years_stop']: tax.loc[data['years_stop'], :] = 0 return [PublicPolicy('carbon_tax', data['start'], end, tax, 'tax', recycling=recycling, recycling_ini=data.get('recycling_ini'))] def read_cite(data): """Creates the income tax credit PublicPolicy instance. Oil fuel-Performant Boiler exempted. Parameters ---------- data Returns ------- list """ l = list() if data['heater'] is not None: heater = get_series(data['heater']).unstack('Heating system final') l.append(PublicPolicy('cite', data['start'], data['end'], heater, 'subsidy_ad_valorem', gest='heater', cap=data['cap'], year_stop=data.get('year_stop'), years_stop=data.get('years_stop')) ) if data['insulation'] is not None: insulation = get_pandas(data['insulation'], lambda x: pd.read_csv(x, index_col=[0])) l.append( PublicPolicy('cite', data['start'], data['end'], insulation, 'subsidy_ad_valorem', gest='insulation', cap=data['cap'], target=data.get('target'), year_stop=data.get('year_stop'), years_stop=data.get('years_stop')) ) return l def read_zil(data): l = list() if isinstance(data['gest'], str): data['gest'] = [data['gest']] for gest in data['gest']: l.append(PublicPolicy('zero_interest_loan', data['start'], data['end'], data['value'], 'zero_interest_loan', cost_max=data.get('cost_max'), gest=gest, duration=data['duration'], year_stop=data.get('year_stop'), years_stop=data.get('years_stop'), public_cost=data.get('public_cost'), target=data.get('target'))) return l def read_reduced_vat(data): l = list() l.append(PublicPolicy('reduced_vat', data['start'], data['end'], data['value'], 'reduced_vat', gest='heater', social_housing=True, year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) l.append( PublicPolicy('reduced_vat', data['start'], data['end'], data['value'], 'reduced_vat', gest='insulation', social_housing=True, year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) return l def read_ad_valorem(data): l = list() value = data['value'] if isinstance(value, str): value = get_series(data['value']) by = 'index' if data.get('index') is not None: mask = get_series(data['index'], header=0) value *= mask if data.get('columns') is not None: mask = get_pandas(data['columns'], lambda x: pd.read_csv(x, index_col=[0]).squeeze()).rename(None) if isinstance(value, (float, int)): value *= mask else: value = value.to_frame().dot(mask.to_frame().T) by = 'columns' if data.get('growth'): growth = get_series(data['growth'], header=None) value = {k: i * value for k, i in growth.items()} name = 'sub_ad_valorem' if data.get('name') is not None: name = data['name'] if isinstance(data['gest'], str): data['gest'] = [data['gest']] for gest in data['gest']: l.append(PublicPolicy(name, data['start'], data['end'], value, 'subsidy_ad_valorem', gest=gest, by=by, target=data.get('target'), cap=data.get('cap'), year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) return l def read_proportional(data): l = list() value = data.get('value') if isinstance(value, str): value = get_series(data['value']) value = value.to_dict() by = 'index' if data.get('index') is not None: mask = get_series(data['index'], header=0) value *= mask if data.get('columns') is not None: mask = get_pandas(data['columns'], lambda x: pd.read_csv(x, index_col=[0]).squeeze()).rename(None) if isinstance(value, (float, int)): value *= mask else: value = value.to_frame().dot(mask.to_frame().T) by = 'columns' if data.get('growth'): growth = get_series(data['growth'], header=None) value = {k: i * value for k, i in growth.items()} name = 'sub_proportional' if data.get('name') is not None: name = data['name'] proportional = 'MWh_cumac' # tCO2_cumac if data.get('proportional') is not None: proportional = data['proportional'] if isinstance(data['gest'], str): data['gest'] = [data['gest']] for gest in data['gest']: l.append(PublicPolicy(name, data['start'], data['end'], value, 'subsidy_proportional', gest=gest, by=by, target=data.get('target'), proportional=proportional, year_stop=data.get('year_stop'), years_stop=data.get('years_stop'))) return l def restriction_energy(data): return [PublicPolicy(data['name'], data['start'], data['end'], data['value'], 'restriction_energy', gest='heater', target=data.get('target'), variable=data.get('variable', True))] def restriction_heater(data): return [PublicPolicy(data['name'], data['start'], data['end'], data['value'], 'restriction_heater', gest='heater', target=data.get('target'), variable=data.get('variable', True))] def premature_heater(data): return [PublicPolicy(data['name'], data['start'], data['end'], data['value'], 'premature_heater', gest='heater', target=data.get('target'))] def read_obligation(data): l = list() banned_performance = get_pandas(data['value'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()).dropna() start = min(banned_performance.index) if data['start'] > start: start = data['start'] frequency = data['frequency'] if frequency is not None: frequency = pd.Series(frequency['value'], index=pd.Index(frequency['index'], name=frequency['name'])) target = None if data.get('target') is not None: target = pd.Series(True, index=pd.Index(data['target']['value'], name=data['target']['name'])) end = data['end'] if data.get('year_stop') is not None: end = data['year_stop'] if data.get('years_stop') is not None: end = data['years_stop'][0] l.append(PublicPolicy(data['name'], start, end, banned_performance, 'obligation', gest='insulation', frequency=frequency, intensive=data['intensive'], min_performance=data['minimum_performance'], target=target)) if data.get('sub_obligation') is not None: value = get_pandas(data['sub_obligation'], lambda x: pd.read_csv(x, index_col=[0]).squeeze()) l.append(PublicPolicy('sub_obligation', start, data['end'], value, 'subsidy_ad_valorem', gest='insulation')) return l def read_regulation(data): if isinstance(data['gest'], list): l = list() for gest in data['gest']: l.append(PublicPolicy(data['name'], data['start'], data['end'], data.get('value'), data['policy'], gest=gest)) return l else: return [PublicPolicy(data['name'], data['start'], data['end'], data.get('value'), data['policy'], gest=data['gest'])] def read_standard_policy(data): l = list() if isinstance(data['gest'], str): gest = [data['gest']] else: gest = data['gest'] for g in gest: l.append(PublicPolicy(data['name'], data['start'], data['end'], None, data['policy'], gest=g)) return l read = {'mpr': read_mpr, 'mpr_variant': read_mpr, 'mpr_efficacite': read_mpr, 'mpr_serenite': read_mpr_serenite, 'mpr_performance': read_mpr_serenite, 'mpr_serenite_variant': read_mpr_serenite, 'mpr_serenite_high_income': read_mpr_serenite, 'mpr_serenite_low_income': read_mpr_serenite, 'mpr_multifamily': read_mpr_serenite, "mpr_multifamily_updated": read_mpr_serenite, 'mpr_multifamily_deep': read_mpr_serenite, 'mpr_serenite_multifamily_variant': read_mpr_serenite, 'cee': read_cee, 'cee_variant': read_cee, 'cee_2018': read_cee, 'cee_2021': read_cee, 'cee_2024': read_cee, 'cap': read_cap, 'cap_updated': read_cap, 'cap_variant': read_cee, 'carbon_tax': read_carbon_tax, 'carbon_tax_variant': read_carbon_tax, 'cite': read_cite, 'cite_insulation': read_cite, 'cite_heater': read_cite, 'reduced_vat': read_reduced_vat, 'reduced_vat_variant': read_reduced_vat, 'zero_interest_loan': read_zil} list_policies = list() for key, item in config['policies'].items(): item['name'] = key if key in read.keys(): list_policies += read[key](item) else: if item.get('policy') == 'subsidy_ad_valorem': list_policies += read_ad_valorem(item) elif item.get('policy') == 'wco': list_policies += read_cee(item) elif item.get('policy') == 'subsidy_proportional': list_policies += read_proportional(item) elif item.get('policy') == 'zero_interest_loan': list_policies += read_zil(item) elif item.get('policy') == 'premature_heater': list_policies += premature_heater(item) elif item.get('policy') == 'restriction_energy': list_policies += restriction_energy(item) elif item.get('policy') == 'restriction_heater': list_policies += restriction_heater(item) elif item.get('policy') == 'obligation': list_policies += read_obligation(item) elif item.get('policy') == 'regulation': list_policies += read_regulation(item) elif item.get('policy') == 'credit_constraint': list_policies += read_regulation(item) elif item.get('policy') == 'carbon_tax': list_policies += read_carbon_tax(item) elif item.get('policy') == 'subsidy_cap': list_policies += read_cap(item) elif item.get('policy') in ['subsidy_present_bias', 'subsidy_multi_family', 'subsidy_landlord', 'tax_status_quo', 'subsidy_status_quo']: list_policies += read_standard_policy(item) else: print('{} reading function is not implemented'.format(key)) policies_heater = [p for p in list_policies if p.gest == 'heater'] policies_insulation = [p for p in list_policies if p.gest == 'insulation'] taxes = [p for p in list_policies if p.policy == 'tax'] return policies_heater, policies_insulation, taxes
[docs]def read_inputs(config, other_inputs=generic_input): """Read all inputs in Python object and concatenate in one dict. Parameters ---------- config: dict Configuration dictionary with path to data. other_inputs: dict Other inputs that are manually inserted in param.py Returns ------- dict """ inputs = dict() idx = range(config['start'], config['end']) inputs.update(other_inputs) if isinstance(config['energy']['energy_prices'], str): energy_prices = get_pandas(config['macro']['energy_prices'], lambda x: pd.read_csv(x, index_col=[0]).rename_axis('Year').rename_axis('Energy', axis=1)) elif isinstance(config['energy']['energy_prices'], dict): energy_prices = get_pandas(config['energy']['energy_prices']['ini'], lambda x: pd.read_csv(x, index_col=[0]).rename_axis('Year').rename_axis('Energy', axis=1)) energy_prices = energy_prices.loc[config['start'], :] rate = Series(config['energy']['energy_prices']['rate']).rename_axis('Energy') if config['energy']['energy_prices'].get('factor') is not None: rate *= config['energy']['energy_prices']['factor'] temp = range(config['start'] + 1, 2051) rate = concat([(1 + rate) ** n for n in range(len(temp))], axis=1, keys=temp) rate = concat((pd.Series(1, index=rate.index, name=config['start']), rate), axis=1) energy_prices = rate.T * energy_prices if config['energy']['energy_prices'].get('shock') is not None: temp = config['energy']['energy_prices']['shock'] energy_prices.loc[temp['start']:, :] *= temp['factor'] else: raise NotImplemented inputs.update({'energy_prices': energy_prices}) energy_taxes = get_pandas(config['energy']['energy_taxes'], lambda x: pd.read_csv(x, index_col=[0]).rename_axis('Year').rename_axis('Heating energy', axis=1)) inputs.update({'energy_taxes': energy_taxes}) inputs.update({'energy_vat': get_series(config['energy']['energy_vat'], header=None)}) inputs.update({'vat_heater': get_series(config['macro']['vat_heating_system'], header=[0])}) inputs.update({'cost_heater': get_series(config['technical']['cost_heater'], header=[0])}) inputs.update({'efficiency': get_series(config['technical']['efficiency'], header=[0])}) inputs.update({'temp_sink': get_series(config['technical']['temp_sink'], header=None)}) inputs.update({'lifetime_heater': get_series(config['technical']['lifetime_heater'], header=[0])}) inputs.update({'cost_insulation': get_series(config['technical']['cost_insulation'], header=[0])}) inputs.update({'frequency_insulation': config['renovation']['frequency_insulation']}) inputs.update({'performance_insulation_renovation': get_series(config['technical']['performance_insulation_renovation'], header=None).to_dict()}) inputs.update({'performance_insulation_construction': get_series(config['technical']['performance_insulation_construction'], header=None).to_dict()}) """bill_saving_preferences = get_pandas(config['macro']['preferences_saving'], lambda x: pd.read_csv(x, index_col=[0])) preferences_insulation.update({'bill_saved': bill_saving_preferences.loc[:, 'Insulation']}) preferences_heater.update({'bill_saved': bill_saving_preferences.loc[:, 'Heater']})""" preferences_insulation = get_series(config['renovation']['preferences_insulation'], header=None).to_dict() preferences_insulation.update({'present_discount_rate': get_series(config['macro']['present_discount_rate'])}) preferences_heater = get_series(config['switch_heater']['preferences_heater'], header=None).to_dict() preferences_heater.update({'present_discount_rate': get_series(config['macro']['present_discount_rate'])}) inputs.update({'preferences': {'insulation': preferences_insulation, 'heater': preferences_heater}}) consumption_ini = get_series(config['macro']['consumption_ini'], header=[0]) inputs.update({'consumption_ini': consumption_ini}) ms_heater = get_pandas(config['switch_heater']['ms_heater'], lambda x: pd.read_csv(x, index_col=[0, 1])) ms_heater.columns.set_names('Heating system final', inplace=True) calibration_heater = { 'ms_heater': ms_heater, 'scale': config['switch_heater']['scale'] } inputs.update({'calibration_heater': calibration_heater}) if config['switch_heater'].get('district_heating') is not None: district_heating = get_series(config['switch_heater']['district_heating'], header=None) inputs.update({'flow_district_heating': district_heating.diff().fillna(0)}) calibration_renovation = None if config['renovation']['endogenous']: renovation_rate_ini = get_series(config['renovation']['renovation_rate_ini']).round(decimals=3) scale_calibration = config['renovation']['scale'] ms_insulation_ini = get_pandas(config['renovation']['ms_insulation']['ms_insulation_ini'], lambda x: pd.read_csv(x, index_col=[0, 1, 2, 3]).squeeze().rename(None).round(decimals=3)) minimum_performance = config['renovation']['ms_insulation']['minimum_performance'] calibration_renovation = {'renovation_rate_ini': renovation_rate_ini, 'scale': scale_calibration, 'threshold_indicator': config['renovation'].get('threshold'), 'ms_insulation_ini': ms_insulation_ini, 'minimum_performance': minimum_performance} inputs.update({'calibration_renovation': calibration_renovation}) if config['renovation'].get('exogenous_social'): exogenous_social = get_pandas(config['renovation']['exogenous_social'], lambda x: pd.read_csv(x, index_col=[0, 1])) exogenous_social.columns = exogenous_social.columns.astype(int) inputs.update({'exogenous_social': exogenous_social}) temp = None if config['renovation'].get('rational_behavior') is not None: if config['renovation']['rational_behavior']['activated']: temp = {'calibration': config['renovation']['rational_behavior']['calibration'], 'social': config['renovation']['rational_behavior']['social'], } inputs.update({'rational_behavior_insulation': temp}) temp = None if config['switch_heater'].get('rational_behavior') is not None: if config['switch_heater']['rational_behavior']['activated']: temp = { 'social': config['switch_heater']['rational_behavior']['social'], } inputs.update({'rational_behavior_heater': temp}) if config['macro'].get('population') is not None: population = get_pandas(config['macro']['population'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()) inputs.update({'population': population.loc[:config['end']]}) if config['macro'].get('stock_ini') is not None: inputs.update({'stock_ini': config['macro']['stock_ini']}) if config['macro'].get('pop_housing') is None: inputs.update({'pop_housing_min': other_inputs['pop_housing_min']}) inputs.update({'factor_pop_housing': other_inputs['factor_pop_housing']}) else: pop_housing = get_pandas(config['macro']['pop_housing'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()) inputs.update({'pop_housing': pop_housing.loc[:config['end']]}) if config['macro'].get('share_single_family_construction') is not None: temp = get_pandas(config['macro']['share_single_family_construction'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()) inputs.update({'share_single_family_construction': temp}) else: if config['macro'].get('share_multi_family') is None: inputs.update({'factor_multi_family': other_inputs['factor_multi_family']}) else: _share_multi_family = get_pandas(config['macro']['share_multi_family'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()) inputs.update({'share_multi_family': _share_multi_family}) if config['macro']['available_income'] is not None: inputs.update({'available_income': config['macro']['available_income']}) inputs.update({'income_rate': config['macro']['income_rate']}) income = get_series(config['macro']['income'], header=[0]) inputs.update({'income': income}) if isinstance(config['macro']['demolition_rate'], (float, int)): demolition_rate = config['macro']['demolition_rate'] elif config['macro'].get('demolition_rate') is not None: demolition_rate = get_series(config['macro']['demolition_rate'], header=None) else: demolition_rate = None inputs.update({'demolition_rate': demolition_rate}) rotation_rate = get_pandas(config['macro']['rotation_rate'], lambda x: pd.read_csv(x, index_col=[0])).squeeze().rename(None) inputs.update({'rotation_rate': rotation_rate}) surface = get_pandas(config['technical']['surface'], lambda x: pd.read_csv(x, index_col=[0, 1, 2]).squeeze().rename(None)) inputs.update({'surface': surface}) ratio_surface = get_pandas(config['technical']['ratio_surface'], lambda x: pd.read_csv(x, index_col=[0])) inputs.update({'ratio_surface': ratio_surface}) if config['macro']['surface_built'] is None: inputs.update({'surface_max': other_inputs['surface_max']}) inputs.update({'surface_elasticity': other_inputs['surface_elasticity']}) else: surface_built = get_pandas(config['macro']['surface_built'], lambda x: pd.read_csv(x, index_col=[0]).squeeze().rename(None)) inputs.update({'surface_built': surface_built}) if config['macro'].get('flow_construction') is not None: flow_construction = get_pandas(config['macro']['flow_construction'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()) inputs.update({'flow_construction': flow_construction}) temp = get_series(config['switch_heater']['ms_heater_built'], header=[0]) # Create a dictionary with the year as key and the share of each heating system as value if 'Year' in temp.index.names: # Interpolation over year using last known value start = temp.index.get_level_values('Year').min() ms_heater_built = temp.unstack('Year').reindex(range(start, config['end']), axis=1) temp_idx = ms_heater_built.index.names ms_heater_built = ms_heater_built.reset_index().interpolate(axis=1, method='pad').set_index(temp_idx) # ms_heater_built = temp.unstack('Heating system').fillna(0) inputs.update({'ms_heater_built': ms_heater_built.fillna(0)}) inputs.update({'health_cost_dpe': get_series(config['health_cost_dpe'])}) inputs.update({'health_cost_income': get_series(config['health_cost_income'])}) carbon_value = get_pandas(config['carbon_value'], lambda x: pd.read_csv(x, index_col=[0], header=None).squeeze()) inputs.update({'carbon_value': carbon_value.loc[config['start']:]}) carbon_emission = get_pandas(config['energy']['carbon_emission'], lambda x: pd.read_csv(x, index_col=[0]).rename_axis('Year')) inputs.update({'carbon_emission': carbon_emission.loc[config['start']:, :] * 1000}) renewable_gas = None if isinstance(config['energy']['renewable_gas'], str): renewable_gas = get_series(config['energy']['renewable_gas'], header=None) renewable_gas = renewable_gas.loc[config['start']:] # set carbon emission of natural gas constant inputs['carbon_emission'].loc[:, 'Natural gas'] = inputs['carbon_emission'].loc[config['start'], 'Natural gas'] inputs.update({'renewable_gas': renewable_gas}) footprint_built = get_series(config['technical']['footprint_construction'], header=None) inputs.update({'footprint_built': footprint_built}) footprint_renovation = get_pandas(config['technical']['footprint_renovation'], lambda x: pd.read_csv(x, index_col=[0])) inputs.update({'footprint_renovation': footprint_renovation}) if config['macro'].get('use_subsidies'): temp = get_pandas(config['macro']['use_subsidies'], lambda x: pd.read_csv(x, index_col=[0]).squeeze()) inputs.update({'use_subsidies': temp}) else: inputs.update({'use_subsidies': pd.Series(dtype=float)}) if 'hourly_profile' in config['technical'].keys(): temp = get_series(config['technical']['hourly_profile'], header=None) temp.index = pd.TimedeltaIndex(range(0, 24), unit='h') inputs.update({'hourly_profile': temp}) inputs['input_financing'].update(config['financing_cost']) if inputs['input_financing']['activated'] is False: temp_idx = get_series(inputs['input_financing']['upfront_max']).index inputs['input_financing']['upfront_max'] = pd.Series(100000, index=temp_idx) inputs['input_financing']['saving_rate'] = pd.Series(0, index=idx) inputs['input_financing']['interest_rate'] = pd.Series(0.1, index=idx) else: inputs['input_financing']['upfront_max'] = get_series(inputs['input_financing']['upfront_max']) inputs['input_financing']['saving_rate'] = get_series(inputs['input_financing']['saving_rate'], header=None) inputs['input_financing']['interest_rate'] = get_series(inputs['input_financing']['interest_rate'], header=None) return inputs
[docs]def parse_inputs(inputs, taxes, config, stock): """Macro module : run exogenous dynamic parameters. Parameters ---------- inputs: dict Raw inputs read as Python object. taxes: list config: dict Configuration file. stock: Series Building stock. Returns ------- dict Parsed input """ idx = range(config['start'], config['end']) parsed_inputs = copy.deepcopy(inputs) if config['technical'].get('technical_progress') is not None: parsed_inputs['technical_progress'] = dict() if 'insulation' in config['technical']['technical_progress'].keys(): if config['technical']['technical_progress']['insulation']['activated']: value = config['technical']['technical_progress']['insulation']['value_end'] start = config['technical']['technical_progress']['insulation']['start'] end = config['technical']['technical_progress']['insulation']['end'] value = round((1 + value) ** (1 / (end - start + 1)) - 1, 5) parsed_inputs['technical_progress']['insulation'] = Series(value, index=range(start, end + 1)).reindex(idx).fillna(0) if 'heater' in config['technical']['technical_progress'].keys(): if config['technical']['technical_progress']['heater']['activated']: value = config['technical']['technical_progress']['heater']['value_end'] start = config['technical']['technical_progress']['heater']['start'] end = config['technical']['technical_progress']['heater']['end'] value = round((1 + value) ** (1 / (end - start + 1)) - 1, 5) parsed_inputs['technical_progress']['heater'] = Series(value, index=range(start, end + 1)).reindex(idx).fillna(0) if 'Year' in inputs['efficiency'].index.names: s = inputs['efficiency'].index.get_level_values('Year').min() df = inputs['efficiency'].copy() df = df.unstack('Heating system').reindex(range(s, config['end'])).fillna(method='ffill') parsed_inputs['efficiency'] = df.T s = inputs['temp_sink'].index.min() parsed_inputs['temp_sink'] = inputs['temp_sink'].reindex(range(s, config['end'])).fillna(method='ffill') if isinstance(inputs['demolition_rate'], (float, int)): parsed_inputs['demolition_rate'] = pd.Series(inputs['demolition_rate'], index=idx[1:]) elif isinstance(inputs['demolition_rate'], Series): s = inputs['demolition_rate'].index.min() parsed_inputs['demolition_rate'] = inputs['demolition_rate'].reindex(range(s, config['end'])).fillna(method='ffill') parsed_inputs['demolition_rate'] = parsed_inputs['demolition_rate'].loc[idx[1:]] if parsed_inputs['demolition_rate'] is None: parsed_inputs['flow_demolition'] = 0 else: parsed_inputs['flow_demolition'] = parsed_inputs['demolition_rate'] * stock.sum() if 'flow_construction' not in parsed_inputs.keys(): if 'population' in inputs.keys(): parsed_inputs['population_total'] = inputs['population'] parsed_inputs['sizing_factor'] = stock.sum() / inputs['stock_ini'] parsed_inputs['population'] = inputs['population'] * parsed_inputs['sizing_factor'] if 'pop_housing' in parsed_inputs.keys(): parsed_inputs['stock_need'] = parsed_inputs['population'] / parsed_inputs['pop_housing'] else: parsed_inputs['stock_need'], parsed_inputs['pop_housing'] = stock_need(parsed_inputs['population'], parsed_inputs['population'][ config['start']] / stock.sum(), inputs['pop_housing_min'], config['start'], inputs['factor_pop_housing']) parsed_inputs['flow_need'] = parsed_inputs['stock_need'] - parsed_inputs['stock_need'].shift(1) # if 'flow_construction' not in parsed_inputs.keys(): parsed_inputs['flow_construction'] = parsed_inputs['flow_need'] + parsed_inputs['flow_demolition'] parsed_inputs['available_income'] = pd.Series( [inputs['available_income'] * (1 + config['macro']['income_rate']) ** (i - idx[0]) for i in idx], index=idx) else: s = inputs['flow_construction'].index.min() parsed_inputs['flow_construction'] = inputs['flow_construction'].reindex(range(s, config['end'])).fillna(method='ffill') parsed_inputs['flow_construction'] = parsed_inputs['flow_construction'].loc[idx] parsed_inputs['surface'] = pd.concat([parsed_inputs['surface']] * len(idx), axis=1, keys=idx) if 'share_single_family_construction' in inputs.keys(): s = inputs['share_single_family_construction'].index.min() parsed_inputs['share_single_family_construction'] = inputs['share_single_family_construction'].reindex(range(s, config['end'])).fillna(method='ffill') parsed_inputs['share_single_family_construction'] = parsed_inputs['share_single_family_construction'].loc[idx] temp = pd.concat((parsed_inputs['share_single_family_construction'], (1 - parsed_inputs['share_single_family_construction'])), axis=1, keys=['Single-family', 'Multi-family'], names=['Housing type']) type_built = parsed_inputs['flow_construction'] * temp.T else: if 'share_multi_family' not in inputs.keys(): parsed_inputs['share_multi_family'] = share_multi_family(parsed_inputs['stock_need'], inputs['factor_multi_family']) type_built = share_type_built(parsed_inputs['stock_need'], parsed_inputs['share_multi_family'], parsed_inputs['flow_construction']) * parsed_inputs['flow_construction'] # multiply by the rate of income from start to end income = parsed_inputs['income'].copy() income = pd.concat([income] * len(idx), axis=1, keys=idx, names=['Year']) rate = pd.Series([(1 + parsed_inputs['income_rate']) ** (i - idx[0]) for i in idx], index=idx) parsed_inputs['income'] = income * rate share_decision_maker = stock.groupby( ['Occupancy status', 'Housing type', 'Income owner', 'Income tenant']).sum().unstack( ['Occupancy status', 'Income owner', 'Income tenant']) share_decision_maker = (share_decision_maker.T / share_decision_maker.sum(axis=1)).T share_decision_maker = pd.concat([share_decision_maker] * type_built.shape[1], keys=type_built.columns, axis=1) construction = (reindex_mi(type_built, share_decision_maker.columns, axis=1) * share_decision_maker).stack( ['Occupancy status', 'Income owner', 'Income tenant']).fillna(0) construction.rename_axis('Year', axis=1, inplace=True) construction = construction.loc[:, idx] construction = construction.stack() ms_heater_built = inputs['ms_heater_built'].stack('Year').unstack('Heating system').fillna(0) restriction = [k for k, p in config['policies'].items() if p.get('policy') in ['restriction_energy', 'restriction_heater']] for policy in restriction: if config['policies'][policy]['policy'] == 'restriction_energy': heating_system = [i for i, energy in resources_data['heating2heater'].items() if energy == config['policies'][policy]['value']] else: heating_system = config['policies'][policy]['value'] heating_system = [i for i in heating_system if i in ms_heater_built.columns] start_policy = config['policies'][policy]['start'] ms_heater_built.loc[ms_heater_built.index.get_level_values('Year') >= start_policy, heating_system] = 0 ms_heater_built = (ms_heater_built.T / ms_heater_built.sum(axis=1)).T ms_heater_built = reindex_mi(ms_heater_built, construction.index) temp = construction.copy() construction = (reindex_mi(construction, ms_heater_built.index) * ms_heater_built.T).T construction = construction.loc[(construction != 0).any(axis=1)] construction = construction.stack('Heating system').unstack('Year') assert round(temp.sum() - construction.sum().sum(), 0) == 0, 'Construction is not equal to the sum of the heating system' construction_dh = select(construction, {'Heating system': 'Heating-District heating'}).sum() parsed_inputs['flow_district_heating'] = parsed_inputs['flow_district_heating'] - construction_dh parsed_inputs['flow_district_heating'][parsed_inputs['flow_district_heating'] < 0] = 0 performance_insulation = pd.concat([pd.Series(inputs['performance_insulation_construction'])] * construction.shape[0], axis=1, keys=construction.index).T parsed_inputs['flow_built'] = pd.concat((construction, performance_insulation), axis=1).set_index( list(performance_insulation.keys()), append=True) parsed_inputs['flow_built'] = pd.concat([parsed_inputs['flow_built']], keys=[False], names=['Existing']).reorder_levels(stock.index.names) if not config['macro']['construction']: parsed_inputs['flow_built'][parsed_inputs['flow_built'] > 0] = 0 """ parsed_inputs['health_expenditure'] = df['Health expenditure'] parsed_inputs['mortality_cost'] = df['Social cost of mortality'] parsed_inputs['loss_well_being'] = df['Loss of well-being']""" parsed_inputs['carbon_value_kwh'] = (parsed_inputs['carbon_value'] * parsed_inputs['carbon_emission'].T).T.dropna() / 10**6 parsed_inputs['embodied_energy_built'] = inputs['footprint_built'].loc['Grey energy (kWh/m2)'] parsed_inputs['carbon_footprint_built'] = inputs['footprint_built'].loc['Carbon content (kgCO2/m2)'] # carbon footprint of renovation parsed_inputs['embodied_energy_renovation'] = inputs['footprint_renovation'].loc['Grey energy (kWh/m2)', :] parsed_inputs['carbon_footprint_renovation'] = inputs['footprint_renovation'].loc['Carbon content (kgCO2/m2)', :] temp = parsed_inputs['surface'].xs(False, level='Existing', drop_level=True) temp = (parsed_inputs['flow_built'].groupby(temp.index.names).sum() * temp).sum() / 10**6 parsed_inputs['Surface construction (Million m2)'] = temp parsed_inputs['Carbon footprint construction (MtCO2)'] = (parsed_inputs['Surface construction (Million m2)'] * parsed_inputs['carbon_footprint_built']) / 10**3 parsed_inputs['Embodied energy construction (TWh PE)'] = (parsed_inputs['Surface construction (Million m2)'] * parsed_inputs['embodied_energy_built']) / 10**3 energy_prices = parsed_inputs['energy_prices'].copy() parsed_inputs['energy_prices_wt'] = energy_prices.copy() energy_taxes = parsed_inputs['energy_taxes'].copy() if config['simple']['prices_constant']: energy_prices = pd.concat([energy_prices.loc[config['start'], :]] * energy_prices.shape[0], keys=energy_prices.index, axis=1).T total_taxes = pd.DataFrame(0, index=energy_prices.index, columns=energy_prices.columns) export_prices = dict() export_prices.update({'energy_prices': energy_prices}) for t in taxes: total_taxes = total_taxes.add(t.value, fill_value=0) export_prices.update({t.name: t.value}) if energy_taxes is not None: total_taxes = total_taxes.add(energy_taxes, fill_value=0) taxes += [PublicPolicy('energy_taxes', energy_taxes.index[0], energy_taxes.index[-1], energy_taxes, 'tax')] export_prices.update({'energy_taxes': energy_taxes}) if config['simple']['prices_constant']: total_taxes = pd.concat([total_taxes.loc[config['start'], :]] * total_taxes.shape[0], keys=total_taxes.index, axis=1).T # energy_vat = energy_prices * (inputs['energy_vat'] / (1 - inputs['energy_vat'])) energy_vat = energy_prices * inputs['energy_vat'] export_prices.update({'energy_vat': energy_vat}) taxes += [PublicPolicy('energy_vat', energy_vat.index[0], energy_vat.index[-1], energy_vat, 'tax')] total_taxes += energy_vat parsed_inputs['taxes'] = taxes parsed_inputs['total_taxes'] = total_taxes energy_prices = energy_prices.add(total_taxes, fill_value=0) parsed_inputs['energy_prices'] = energy_prices export_prices = reverse_dict({k: item.to_dict() for k, item in export_prices.items()}) export_prices = concat([pd.DataFrame(item) for k, item in export_prices.items()], axis=1, keys=export_prices.keys()) parsed_inputs.update({'export_prices': export_prices}) supply = {'insulation': None, 'heater': None} if config.get('supply') is not None: if config['supply']['activated_insulation']: supply.update({'insulation': {'markup_insulation': config['supply']['markup_insulation']}}) if config['supply']['activated_heater']: supply.update({'heater': {'markup_heater': config['supply']['markup_heater']}}) parsed_inputs.update({'supply': supply}) premature_replacement = None if config['switch_heater'].get('premature_replacement') is not None: premature_replacement = {'time': config['switch_heater']['premature_replacement'], 'information_rate': config['switch_heater']['information_rate']} parsed_inputs.update({'premature_replacement': premature_replacement}) return parsed_inputs
[docs]def dump_inputs(parsed_inputs, path, figures=None): """Create summary input DataFrame. Parameters ---------- parsed_inputs: dict Returns ------- DataFrame """ summary_input = dict() if 'sizing_factor' in parsed_inputs.keys(): summary_input['Sizing factor (%)'] = pd.Series(parsed_inputs['sizing_factor'], index=parsed_inputs['population'].index) summary_input['Total population (Millions)'] = parsed_inputs['population'] / 10**6 summary_input['Income (Billions euro)'] = parsed_inputs['available_income'] * parsed_inputs['sizing_factor'] / 10**9 summary_input['Buildings stock (Millions)'] = parsed_inputs['stock_need'] / 10**6 summary_input['Person by housing'] = parsed_inputs['pop_housing'] summary_input['Buildings additional (Thousands)'] = parsed_inputs['flow_need'] / 10**3 summary_input['Buildings built (Thousands)'] = parsed_inputs['flow_construction'] / 10**3 summary_input['Buildings demolished (Thousands)'] = parsed_inputs['flow_demolition'] / 10**3 temp = parsed_inputs['surface'].xs(True, level='Existing', drop_level=True) temp.index = temp.index.map(lambda x: 'Surface existing {} - {} (m2/dwelling)'.format(x[0], x[1])) summary_input.update(temp.T) temp = parsed_inputs['surface'].xs(False, level='Existing', drop_level=True) temp.index = temp.index.map(lambda x: 'Surface construction {} - {} (m2/dwelling)'.format(x[0], x[1])) summary_input.update(temp.T) summary_input['Surface construction (Million m2)'] = parsed_inputs['Surface construction (Million m2)'] summary_input['Carbon footprint construction (MtCO2)'] = parsed_inputs['Carbon footprint construction (MtCO2)'] summary_input['Embodied energy construction (TWh PE)'] = parsed_inputs['Embodied energy construction (TWh PE)'] summary_input = pd.DataFrame(summary_input) t = parsed_inputs['total_taxes'].copy() t.columns = t.columns.map(lambda x: 'Taxes {} (euro/kWh)'.format(x)) temp = parsed_inputs['energy_prices'].copy() if figures is not False: make_plot(temp.dropna(), 'Prices (euro/kWh)', format_y=lambda y, _: '{:.2f}'.format(y), colors=resources_data['colors'], save=os.path.join(path, 'energy_prices.png')) temp.columns = temp.columns.map(lambda x: 'Prices {} (euro/kWh)'.format(x)) summary_input = pd.concat((summary_input, t, temp), axis=1) temp = parsed_inputs['income'].copy() temp.index = temp.index.map(lambda x: 'Income {} (euro/year)'.format(x)) summary_input = pd.concat((summary_input, temp.T), axis=1) summary_input.T.round(3).to_csv(os.path.join(path, 'input.csv')) parsed_inputs['export_prices'].round(4).to_csv(os.path.join(path, 'energy_prices.csv')) return summary_input
[docs]def dict2data_inputs(inputs): """Grouped all inputs in the same DataFrame. Process is useful to implement a global sensitivity analysis. Returns ------- DataFrame """ data = DataFrame(columns=['variables', 'index', 'value']) metadata = DataFrame(columns=['variables', 'type', 'name', 'index', 'columns']) for key, item in inputs.items(): i = True if isinstance(item, dict): metadata = concat((metadata.T, Series({'variables': key, 'type': type(item).__name__})), axis=1).T i = False item = Series(item) if isinstance(item, (float, int)): data = concat((data.T, Series({'variables': key, 'value': item})), axis=1).T metadata = concat((metadata.T, Series({'variables': key, 'type': type(item).__name__})), axis=1).T if isinstance(item, DataFrame): metadata = concat((metadata.T, Series({'variables': key, 'type': type(item).__name__, 'index': item.index.names.copy(), 'columns': item.columns.names.copy()})), axis=1).T i = False item = item.stack(item.columns.names) if isinstance(item, Series): if i: metadata = concat((metadata.T, Series({'variables': key, 'type': type(item).__name__, 'name': item.name, 'index': item.index.names.copy()})), axis=1).T if isinstance(item.index, MultiIndex): item.index = item.index.to_flat_index() item.index = item.index.rename('index') df = concat([item.rename('value').reset_index()], keys=[key], names=['variables']).reset_index('variables') data = concat((data, df), axis=0) data = data.astype({'variables': 'string', 'value': 'float64'}) data.reset_index(drop=True, inplace=True) return data
[docs]def data2dict_inputs(data, metadata): """Parse aggregate data pandas and return dict fill with several inputs. Parameters ---------- data: DataFrame Model data input. metadata: DataFrame Additional information to find out how to parse data. Returns ------- dict """ def parse_index(n, index_values): if len(n) == 1: idx = Index(index_values, name=n[0]) else: idx = MultiIndex.from_tuples(index_values) idx.names = n return idx parsed_input = dict() for variables, df in data.groupby('variables'): meta = metadata[metadata['variables'] == variables] if meta['type'].iloc[0] == 'int': parsed_input.update({variables: int(df['value'].iloc[0])}) elif meta['type'].iloc[0] == 'float': parsed_input.update({variables: float(df['value'].iloc[0])}) elif meta['type'].iloc[0] == 'Series': idx = parse_index(meta['index'].iloc[0], df['index'].values) parsed_input.update({variables: Series(df['value'].values, name=str(meta['name'].iloc[0]), index=idx)}) elif meta['type'].iloc[0] == 'DataFrame': idx = parse_index(meta['index'].iloc[0] + meta['columns'].iloc[0], df['index'].values) parsed_input.update({variables: Series(df['value'].values, name=str(meta['name'].iloc[0]), index=idx).unstack( meta['columns'].iloc[0])}) elif meta['type'].iloc[0] == 'dict': parsed_input.update({variables: Series(df['value'].values, index=df['index'].values).to_dict()}) return parsed_input
[docs]def create_simple_policy(start, end, value=0.3, gest='insulation'): return PublicPolicy('sub_ad_valorem', start, end, value, 'subsidy_ad_valorem', gest=gest)