# -*- coding: utf-8 -*-
"""
"""
import numpy as np
import scipy.io
import pandas as pd
from scipy.interpolate import interp1d
import calibration.sub.compute_income as calcmp
import equilibrium.functions_dynamic as eqdyn
[docs]def import_grid(path_data):
"""
Import standard geographic grid from the City of Cape Town.
Parameters
----------
path_data : str
Path towards data used in the model
Returns
-------
grid : DataFrame
Table yielding, for each grid cell (24,014), its x and y
(centroid) coordinates, and its distance (in km) to the city centre
"""
data = pd.read_csv(path_data + 'grid_NEDUM_Cape_Town_500.csv', sep=';')
grid = pd.DataFrame()
grid["id"] = data.ID
# Reduce dimensionality of data by 1,000
grid["x"] = data.X/1000
grid["y"] = data.Y/1000
# Compute distance to grid center (defined by x and y coordinates of CBD)
x_center = -53267.944572790904/1000
y_center = -3754855.1309322729/1000
# Cf. Pythagoras theorem
grid["dist"] = (((grid.x - x_center) ** 2)
+ ((grid.y - y_center) ** 2)) ** 0.5
return grid, np.array([x_center, y_center])
[docs]def import_amenities(path_precalc_inp, options):
"""
Import calibrated amenity index.
Parameters
----------
path_precalc_inp : str
Path for precalcuted input data (calibrated parameters)
options : dict
Dictionary of default options
Returns
-------
amenities : ndarray(float64)
Normalized amenity index (relative to the mean) for each grid cell
(24,014)
"""
# Follow calibration from Pfeiffer et al. (appendix C4)
if options["load_precal_param"] == 1:
precalculated_amenities = scipy.io.loadmat(
path_precalc_inp + 'calibratedAmenities.mat')["amenities"]
elif options["load_precal_param"] == 0:
precalculated_amenities = np.load(
path_precalc_inp + 'calibratedAmenities.npy')
# Normalize index by mean of values
amenities = (precalculated_amenities
/ np.nanmean(precalculated_amenities)).squeeze()
return amenities
[docs]def import_hypothesis_housing_type():
"""
Define housing market accessibility hypotheses.
Returns
-------
income_class_by_housing_type : DataFrame
Set of dummies coding for housing market access (across 4 housing
submarkets) for each income group (4, from poorest to richest)
"""
income_class_by_housing_type = pd.DataFrame()
# Select which income class can live in formal settlements
income_class_by_housing_type["formal"] = np.array([1, 1, 1, 1])
# Select which income class can live in backyard settlements
income_class_by_housing_type["backyard"] = np.array([1, 1, 0, 0])
# Select which income class can live in informal settlements
income_class_by_housing_type["settlement"] = np.array([1, 1, 0, 0])
# Select which income class can live in informal settlements
income_class_by_housing_type["subsidized"] = np.array([1, 0, 0, 0])
return income_class_by_housing_type
[docs]def import_income_classes_data(param, path_data):
"""
Import population and average income per income class in the model.
Parameters
----------
param : dict
Dictionary of default parameters
path_data : str
Path towards data used in the model
Returns
-------
mean_income : float64
Average median income across total population
households_per_income_class : ndarray(float64)
Exogenous total number of households per income group (excluding people
out of employment, for 4 groups)
average_income : ndarray(float64)
Average median income for each income group in the model (4)
income_mult : ndarray(float64)
Ratio of income-group-specific average income over global average
income
income_baseline : DataFrame
Table summarizing, for each income group in the data (12, including
people out of employment), the number of households living in each
endogenous housing type (3), their total number at baseline year (2011)
in retrospect (2001), as well as the distribution of their average
income (at baseline year)
households_per_income_and_housing : ndarray(float64, ndim=2)
Exogenous number of households per income group (4, from poorest to
richest) in each endogenous housing type (3: formal private,
informal backyards, informal settlements), from SP-level data
"""
# Import population distribution according to housing type and income class
# Note that RDP is included as formal housing in the data : we will correct
# that in equilibrium.compute_equilibrium
income_baseline = pd.read_csv(path_data + 'Income_distribution_2011.csv')
# We only have data on median income per income group in the data (12),
# as opposed to income groups in the model (4). Therefore, we are only
# able to compute an average of median incomes in place of a proper median
# (or average) income per income group (in the model or the data):
# in any case, this is only used as validation data (does not enter the
# model as an input)
# Overal "mean" income
mean_income = np.sum(income_baseline.Households_nb
* income_baseline.INC_med
) / sum(income_baseline.Households_nb)
# Get income classes from data (12)
nb_of_hh_bracket = income_baseline.Households_nb
avg_income_bracket = income_baseline.INC_med
# Initialize income classes in model (4)
average_income = np.zeros(param["nb_of_income_classes"])
households_per_income_class = np.zeros(param["nb_of_income_classes"])
households_per_income_in_formal = np.zeros(param["nb_of_income_classes"])
households_per_income_in_backyard = np.zeros(
param["nb_of_income_classes"])
households_per_income_in_informal = np.zeros(param["nb_of_income_classes"])
# Compute population and "average" income for each class in the model
for j in range(0, param["nb_of_income_classes"]):
households_per_income_class[j] = np.sum(
nb_of_hh_bracket[(param["income_distribution"] == j + 1)])
# Note that this is in fact an average over median incomes
average_income[j] = np.sum(
avg_income_bracket[(param["income_distribution"] == j + 1)]
* nb_of_hh_bracket[param["income_distribution"] == j + 1]
) / households_per_income_class[j]
# We do the same across housing types for validation
households_per_income_in_formal[j] = np.sum(
income_baseline.formal[(param["income_distribution"] == j + 1)])
households_per_income_in_backyard[j] = np.sum(
income_baseline.informal_backyard[
(param["income_distribution"] == j + 1)]
)
households_per_income_in_informal[j] = np.sum(
income_baseline.informal_settlement[
(param["income_distribution"] == j + 1)]
)
# Compute ratio of "average" income per class over global income average:
# we normalize "average" income to 1
income_mult = average_income / mean_income
# Store breakdown in unique array
households_per_income_and_housing = np.vstack(
[households_per_income_in_formal, households_per_income_in_backyard,
households_per_income_in_informal])
return (mean_income, households_per_income_class, average_income,
income_mult, income_baseline, households_per_income_and_housing)
[docs]def import_households_data(path_precalc_inp):
"""
Import geographic data with class distributions for households.
Parameters
----------
path_precalc_inp : str
Path for precalcuted input data (calibrated parameters)
Returns
-------
data_rdp : DataFrame
Table yielding, for each grid cell (24,014), the associated cumulative
count of cells with some formal subsidized housing, and the associated
area (in m²) dedicated to such housing
housing_types_sp : DataFrame
Table yielding, for each Small Place (1,046), the number of informal
backyards, of informal settlements, and total dwelling units, as well
as their (centroid) x and y coordinates
data_sp : DataFrame
Table yielding, for each Small Place (1,046), the average dwelling size
(in m²), the average land price and annual income level (in rands),
the size of unconstrained area for construction (in m²), the total area
(in km²), the distance to the city centre (in km), whether or not the
location belongs to Mitchells Plain, and the SP code
mitchells_plain_grid_baseline : ndarray(uint8)
Dummy coding for belonging to Mitchells Plain neighbourhood
at the grid-cell (24,014) level
grid_formal_density_HFA : ndarray(float64)
Population density (per m²) in formal private housing at baseline year
(2011) at the grid-cell (24,014) level
threshold_income_distribution : ndarray(int32)
Annual income level (in rands) above which a household is taken as
being part of one of the 4 income groups in the model
income_distribution : ndarray(uint16, ndim=2)
Exogenous number of households in each Small Place (1,046) for each
income group in the model (4)
cape_town_limits : ndarray(uint8)
Dummy indicating whether or not a Small Place (1,046) belongs to the
metropolitan area of the City of Cape Town
"""
# Import a structure of characteristics (for pixels and SPs mostly)
data = scipy.io.loadmat(path_precalc_inp + 'data.mat')['data']
# Get maximum income thresholds for model income classes (4)
threshold_income_distribution = data[
'thresholdIncomeDistribution'][0][0].squeeze()
# Get data from RDP for pixels (24,014)
# NB: this is obtained from cadastre data
data_rdp = pd.DataFrame()
# Number of RDP units
data_rdp["count"] = data['gridCountRDPfromGV'][0][0].squeeze()
# Total surface of RDP units (in m²)
data_rdp["area"] = data['gridAreaRDPfromGV'][0][0].squeeze()
# Get other data at grid level
# Dummy indicating wheter pixel belongs to Mitchells Plain district
mitchells_plain_grid_baseline = data['MitchellsPlain'][0][0].squeeze()
# Population density in formal housing (per m²)
grid_formal_density_HFA = data['gridFormalDensityHFA'][0][0].squeeze()
# Get housing type data for SPs (1,046)
housing_types_sp = pd.DataFrame()
# Number of backyard settlements
housing_types_sp["backyard_SP_2011"] = data[
'spInformalBackyard'][0][0].squeeze()
# Number of informal settlements
housing_types_sp["informal_SP_2011"] = data[
'spInformalSettlement'][0][0].squeeze()
# Number of dwellings
housing_types_sp["total_dwellings_SP_2011"] = data[
'spTotalDwellings'][0][0].squeeze()
# Coordinates
housing_types_sp["x_sp"] = data['spX'][0][0].squeeze()
housing_types_sp["y_sp"] = data['spY'][0][0].squeeze()
# Get other data for SPs
data_sp = pd.DataFrame()
# Average dwelling size
data_sp["dwelling_size"] = data['spDwellingSize'][0][0].squeeze()
# Average price of real estate (per m²) for baseline year (2011)
data_sp["price"] = data['spPrice'][0][0].squeeze()[2, :]
# Annual average household income (in rands)
data_sp["income"] = data['sp2011AverageIncome'][0][0].squeeze()
# Unconstrained area for construction (in m²)
data_sp["unconstrained_area"] = data["spUnconstrainedArea"][0][0].squeeze()
# Total area (in m²)
data_sp["area"] = data["sp2011Area"][0][0].squeeze()
# Centroid distance to city centre (in km)
data_sp["distance"] = data["sp2011Distance"][0][0].squeeze()
# Dummy indicating wheter SP belongs to Mitchells Plain district
data_sp["mitchells_plain"] = data["sp2011MitchellsPlain"][0][0].squeeze()
# SP codes
data_sp["sp_code"] = data["spCode"][0][0].squeeze()
# Number of househods of each model income class in each SP
income_distribution = data[
"sp2011IncomeDistributionNClass"][0][0].squeeze()
# Dummy indicating whether SP belongs to Cape Town
cape_town_limits = data["sp2011CapeTown"][0][0].squeeze()
return (data_rdp, housing_types_sp, data_sp, mitchells_plain_grid_baseline,
grid_formal_density_HFA, threshold_income_distribution,
income_distribution, cape_town_limits)
[docs]def import_macro_data(param, path_scenarios, path_folder):
"""
Import interest rate and population per housing type.
Parameters
----------
param : dict
Dictionary of default parameters
path_scenarios : str
Path towards raw scenarios used for time-moving exogenous variables
path_folder :
Path towards the root data folder
Returns
-------
interest_rate : float64
Interest rate for the overall economy, corresponding to an average
over past years
population : int64
Total number of households in the city (from Small-Area-Level data)
housing_type_data : ndarray(int64)
Exogenous number of households per housing type (4: formal private,
informal backyards, informal settlements, formal subsidized), from
Small-Area-Level data
total_RDP : int
Number of households living in formal subsidized housing (from SAL
data)
"""
# Interest rate
# Import interest rate history + scenario until 2040
scenario_interest_rate = pd.read_csv(
path_scenarios + 'Scenario_interest_rate_1.csv', sep=';')
# Fit linear regression spline centered around baseline year
spline_interest_rate = interp1d(
scenario_interest_rate.Year_interest_rate[
~np.isnan(scenario_interest_rate.real_interest_rate)]
- param["baseline_year"],
scenario_interest_rate.real_interest_rate[
~np.isnan(scenario_interest_rate.real_interest_rate)],
'linear'
)
# Get interest rate as the mean (in %) over (3) last years
interest_rate = eqdyn.interpolate_interest_rate(spline_interest_rate, 0)
# Aggregate population: this come from SAL (Small Area Level) data and
# serves as a benchmark to reweight aggregates obtained from diverse
# granular data sets so that the same overall population is indeed
# considered along the model
sal_data = pd.read_excel(
path_folder
+ "housing_types_sal_analysis.xlsx",
header=6, nrows=5339)
sal_data["informal"] = sal_data[
"Informal dwelling (shack; not in backyard; e.g. in an"
+ " informal/squatter settlement or on a farm)"]
sal_data["backyard_formal"] = sal_data["House/flat/room in backyard"]
sal_data["backyard_informal"] = sal_data[
"Informal dwelling (shack; in backyard)"]
# NB: we do not include traditional houses, granny flats, caravans, and
# others in formal (note that RDP is included for now)
sal_data["formal"] = np.nansum(sal_data.iloc[:, [3, 5, 6, 7, 8]], 1)
rdp_data = pd.read_excel(
path_folder
+ "housing_types_sal_analysis.xlsx",
sheet_name='Analysis', header=None, names=None, usecols="A:B",
skiprows=18, nrows=6)
total_RDP = int(rdp_data.iloc[5, 1])
# Note that the underlying data inference process used to count formal
# subsidized housing units probably takes (older) council housing units
# into account. As this is also a form of social housing, they will be
# assimilated to RDP units in the analysis (and the terms formal subsidized
# and RDP will be used interchangeably).
# An external validation exercise indeed shows that we obtain a number of
# RDP/BNG units close enough to the one estimated using Cape Town's 2012
# General Valuation Roll (SR2 incremental housing) after substracting
# 40,000 council housing units
# Here, we substract RDP from formal (no need to correct again in
# validation exercises)
total_formal = sal_data["formal"].sum() - total_RDP
total_informal = sal_data["informal"].sum()
# Note that we only include informal backyards (non-concrete structures,
# similar to informal "shacks") by assumption in the model
total_backyard = sal_data["backyard_informal"].sum()
housing_type_data = np.array([total_formal, total_backyard, total_informal,
total_RDP])
population = sum(housing_type_data)
return interest_rate, population, housing_type_data, total_RDP
[docs]def import_land_use(grid, options, param, data_rdp, housing_types,
housing_type_data, path_data, path_folder):
"""
Return linear regression spline estimates for housing building paths.
This function imports scenarios about land use availability for several
housing types, then processes them to obtain a percentage of land available
for construction in each housing type, for each grid cell over the years.
It first sets the evolution of total number formal subsidized housing
dwellings, then does the same at the grid-cell level, before defining
land availability over time for some housing types: first formal subsidized
housing, then informal backyards and informal settlements. It also defines
the evolution of unconstrained land, with and without an urban edge.
Parameters
----------
grid : DataFrame
Table yielding, for each grid cell (24,014), its x and y
(centroid) coordinates, and its distance (in km) to the city centre
options : dict
Dictionary of default options
param : dict
Dictionary of default parameters
data_rdp : DataFrame
Table yielding, for each grid cell (24,014), the associated cumulative
count of cells with some formal subsidized housing, and the associated
area (in m²) dedicated to such housing
housing_types : DataFrame
Table yielding, for 4 different housing types (informal settlements,
formal backyards, informal backyards, and formal private housing),
the number of households in each grid cell (24,014), from SAL data.
Note that the notion of formal backyards correspond to backyards with
a concrete structure (as opposed to informal "shacks"), and not to
backyards located within the premises of formal private homes. In any
case, we abstract from both of those definitions for the sake of
simplicity and as they account for a marginal share of overall
backyarding.
housing_type_data : ndarray(int64)
Exogenous number of households per housing type (4: formal private,
informal backyards, informal settlements, formal subsidized), from
Small-Area-Level data
path_data : str
Path towards data used in the model
path_folder : str
Path towards the root data folder
Returns
-------
spline_RDP : interp1d
Linear interpolation for the total number of formal subsidized
dwellings over the years (baseline year set at 0)
spline_estimate_RDP : interp1d
Linear interpolation for the grid-level number of formal subsidized
dwellings over the years (baseline year set at 0)
spline_land_RDP : interp1d
Linear interpolation for the grid-level land availability (in %)
for formal subsidized housing over the years (baseline year set at 0)
spline_land_backyard : interp1d
Linear interpolation for the grid-level land availability (in %)
for informal backyards over the years (baseline year set at 0)
spline_land_informal : interp1d
Linear interpolation for the grid-level land availability (in %)
for informal settlements over the years (baseline year set at 0)
spline_land_constraints : interp1d
Linear interpolation for the grid-level overall land availability,
(in %) over the years (baseline year set at 0)
number_properties_RDP : ndarray(float64)
Number of formal subsidized dwellings per grid cell (24,014) at
baseline year (2011)
"""
# 0. Import data
# Social housing
# New RDP construction projects
construction_rdp = pd.read_csv(path_data + 'grid_new_RDP_projects.csv')
# RDP population data
# We take total population in RDP at baseline year
RDP_baseline = housing_type_data[3]
# Comes from 2001 General Valuation Roll
RDP_restrospect = 1.1718e+05
# Land cover for informal settlements
# Set the area of a pixel in m²
area_pixel = (0.5 ** 2) * 1000000
# General surface data on land use (urban, agricultural...)
land_use_data = pd.read_csv(
path_data + 'grid_NEDUM_Cape_Town_500.csv', sep=';')
# Surface data on scenarios for informal settlement building
informal_risks_short = pd.read_csv(
path_folder + 'occupation_maps/informal_settlements_risk_SHORT.csv',
sep=',')
informal_risks_long = pd.read_csv(
path_folder + 'occupation_maps/informal_settlements_risk_LONG.csv',
sep=',')
informal_risks_medium = pd.read_csv(
path_folder + 'occupation_maps/informal_settlements_risk_MEDIUM.csv',
sep=',')
informal_risks_HIGH = pd.read_csv(
path_folder + 'occupation_maps/informal_settlements_risk_pHIGH.csv',
sep=',')
informal_risks_VERYHIGH = pd.read_csv(
path_folder
+ 'occupation_maps/informal_settlements_risk_pVERYHIGH.csv',
sep=',')
# Number of informal dwellings per pixel in 2020
informal_settlements_nearfuture = pd.read_excel(
path_data + 'inf_dwellings_2020.xlsx')
# Here we correct by (1 / max_land_use) to make the area comparable with
# other informal risks (this will be removed afterwards as it does not
# correspond to reality)
informal_risks_nearfuture = (
informal_settlements_nearfuture.inf_dwellings_2020
* param["shack_size"]
* (1 / param["max_land_use_settlement"]))
# We neglect building risks smaller than 1% of a pixel area
informal_risks_nearfuture[informal_risks_nearfuture < area_pixel/100] = 0
informal_risks_nearfuture[np.isnan(informal_risks_nearfuture)] = 0
# Pixel selection for scenario correction
# We want to include pushback from farmers around Philipi, hence we delay
# settlement of those areas compared to other short-term risks
polygon_medium_timing = pd.read_excel(
path_folder + 'occupation_maps/polygon_medium_timing.xlsx',
header=None)
# We take the selected pixels from medium scenario and make their risk
# area equal to the short scenario, then take the same selection in the
# short scenario and make their risk area equal to zero.
# Basically, we are just turning those areas from short-term into
# medium-term risks
for item in list(polygon_medium_timing.squeeze()):
informal_risks_medium.loc[
informal_risks_medium["grid.data.ID"] == item,
"area"
] = informal_risks_short.loc[
informal_risks_short["grid.data.ID"] == item,
"area"
]
informal_risks_short.loc[
informal_risks_short["grid.data.ID"] == item,
"area"] = 0
# 1. Total RDP population
# We compute linear regression spline for 4 years centered around baseline
# Construction rate comes from Pfeiffer et al.'s median scenario, then we
# apply a lower rate after the end of the programme in 2020
spline_RDP = interp1d(
[2001 - param["baseline_year"], 2011 - param["baseline_year"],
2020 - param["baseline_year"], 2041 - param["baseline_year"]],
[RDP_restrospect, RDP_baseline,
RDP_baseline + 9 * param["current_rate_public_housing"],
RDP_baseline + 9 * param["current_rate_public_housing"]
+ 21 * param["future_rate_public_housing"]],
'linear'
)
# We capture the output of the function to be used later on
# Start of the actual construction programme
year_begin_RDP = 2015
# We "center" scenarios around baseline year until end of programme
year_RDP = np.arange(year_begin_RDP, 2040) - param["baseline_year"]
# We take the spline output for this timeline
# NB: this will only be used as an intermediate input for RDP
# land use, not as a direct input in the model
number_RDP = spline_RDP(year_RDP)
# 2. RDP nb of houses
# Setting the timeline
# We take the absolute difference between projected and actual RDP
# constructions over the years (RDP left to build) and return the year
# index for the minimum: this define the short-term horizon of the
# programme
year_short_term = np.argmin(
np.abs(sum(construction_rdp.total_yield_DU_ST)
- (number_RDP - number_RDP[0]))
)
# We just take size of the projection array for long-term horizon
year_long_term = 30
# We save the timeline into a list
year_data_informal = [
2000 - param["baseline_year"],
year_begin_RDP - param["baseline_year"],
year_short_term,
year_long_term
]
# Getting the outcome
# We weight by closeness to the center to get retrospective number of RDP
# in 2000 (by assuming that central areas where built before)
number_properties_retrospect = (
data_rdp["count"]
* (1 - grid.dist / max(grid.dist[data_rdp["count"] > 0]))
)
# Actual number of RDP houses at baseline year (2011)
RDP_houses_estimates = data_rdp["count"]
# Regression spline
spline_estimate_RDP = interp1d(
year_data_informal,
np.transpose(
[number_properties_retrospect,
RDP_houses_estimates,
RDP_houses_estimates + construction_rdp.total_yield_DU_ST,
RDP_houses_estimates + construction_rdp.total_yield_DU_ST
+ construction_rdp.total_yield_DU_LT]
),
'linear')
number_properties_RDP = spline_estimate_RDP(0)
# 3. RDP pixel share
# Getting areas: note that we divide by max_land_use as original data is
# already corrected and we want to make the spline coherent with other
# non-corrected housing types
# % of the pixel area dedicated to RDP (after accounting for backyards)
# NB : we get the share of RDP dwelling size over size of whole RDP
# premises (dwelling + backyard) that we multiply by total RDP area
# per pixel over pixel area (correcting for max_land_use)
area_RDP = (data_rdp["area"]
* param["RDP_size"]
/ (param["backyard_size"] + param["RDP_size"])
/ area_pixel) / param["max_land_use"]
# For the RDP constructed area, we take the min between declared value and
# extrapolation from our initial size parameters
# We do it for the short term
area_RDP_short_term = np.minimum(
construction_rdp.area_ST,
(param["backyard_size"] + param["RDP_size"])
* construction_rdp.total_yield_DU_ST
) / param["max_land_use"]
# Then for the long term, while capping the constructed area at the pixel
# size (just in case)
area_RDP_long_term = np.minimum(
np.minimum(
construction_rdp.area_ST + construction_rdp.area_LT,
(param["backyard_size"] + param["RDP_size"])
* (construction_rdp.total_yield_DU_ST
+ construction_rdp.total_yield_DU_LT)
),
area_pixel
) / param["max_land_use"]
# Regression spline
spline_land_RDP = interp1d(
year_data_informal,
np.transpose(
[area_RDP, area_RDP, area_RDP_short_term, area_RDP_long_term]
),
'linear')
# 4. Backyarding pixel share
# Getting areas
# We get share of pixel backyard area by reweighting total area by the
# share of backyards in formal subsidized housing units
area_backyard = (data_rdp["area"] * param["backyard_size"]
/ (param["backyard_size"] + param["RDP_size"])
/ area_pixel)
# Share of pixel urban land
urban = np.transpose(land_use_data.urban) / area_pixel
# We consider that the potential for backyard building cannot exceed that
# of urban area
coeff_land_backyard = np.fmin(urban, area_backyard)
# NB: the following is only useful if we consider both formal and
# informal backyards (deprecated).
# We reweight max pixel share available for backyarding (both formal and
# informal) by a ratio of how densely populated the pixel is: this yields
# an alternative definition of coeff_land_backyard that includes formal
# backyarding and may be used for flood damage estimations
# Note that this does not tell whether backyarding does occur within
# formal private houses or formal subsidized ones (we keep only the
# second option as an assumption in the model)
actual_backyards = (
(housing_types.backyard_formal_grid
+ housing_types.backyard_informal_grid)
/ np.nanmax(housing_types.backyard_formal_grid
+ housing_types.backyard_informal_grid)
) * np.max(coeff_land_backyard)
if options["actual_backyards"] == 0:
actual_backyards = 0
# To project backyard share of pixel area on the ST, we add the potential
# backyard construction from RDP projects
area_backyard_short_term = (
area_backyard
+ np.maximum(
area_RDP_short_term
- construction_rdp.total_yield_DU_ST * param["RDP_size"],
0
) / area_pixel
)
# We do the same for RDP share of pixel area, by substracting backyarding
area_RDP_short_term = (
area_RDP
+ np.minimum(
construction_rdp.total_yield_DU_ST * param["RDP_size"],
construction_rdp.area_ST
) / area_pixel
)
# We make sure that pixel share of backyard area does not exceed max
# available land after RDP construction
area_backyard_short_term = np.minimum(
area_backyard_short_term, param["max_land_use"] - area_RDP_short_term
)
# We repeat the process for backyarding over the LT
area_backyard_long_term = (
area_backyard
+ np.maximum(
area_RDP_long_term
- (construction_rdp.total_yield_DU_LT
+ construction_rdp.total_yield_DU_ST) * param["RDP_size"],
0
) / area_pixel
)
area_RDP_long_term = (
area_RDP
+ np.minimum(
(construction_rdp.total_yield_DU_LT
+ construction_rdp.total_yield_DU_ST) * param["RDP_size"],
area_RDP_long_term
) / area_pixel
)
area_backyard_long_term = np.minimum(
area_backyard_long_term, param["max_land_use"] - area_RDP_long_term
)
# Regression spline
spline_land_backyard = interp1d(
year_data_informal,
np.transpose(
[np.fmax(area_backyard, actual_backyards),
np.fmax(area_backyard, actual_backyards),
np.fmax(area_backyard_short_term, actual_backyards),
np.fmax(area_backyard_long_term, actual_backyards)]
),
'linear')
# 5. Unconstrained land pixel share
# We get pixel share with and without urban edge
coeff_land_no_urban_edge = (
np.transpose(land_use_data.unconstrained_out)
+ np.transpose(land_use_data.unconstrained_UE)) / area_pixel
coeff_land_urban_edge = np.transpose(
land_use_data.unconstrained_UE) / area_pixel
# Regression spline (with or without urban edge)
if options["urban_edge"] == 0:
year_constraints = np.array(
[1990, param["year_urban_edge"] - 1, param["year_urban_edge"],
2040]
) - param["baseline_year"]
spline_land_constraints = interp1d(
year_constraints,
np.transpose(
np.array(
[coeff_land_urban_edge, coeff_land_urban_edge,
coeff_land_no_urban_edge, coeff_land_no_urban_edge]
)
), 'linear'
)
else:
year_constraints = np.array([1990, 2040]) - param["baseline_year"]
spline_land_constraints = interp1d(
year_constraints,
np.transpose(
np.array([coeff_land_urban_edge, coeff_land_urban_edge])
)
)
# 6. Informal settlement pixel share
# Getting areas
# We get pixel share for informal settlement area at baseline (2011)
informal_baseline = np.transpose(land_use_data.informal) / area_pixel
# We consider construction risk as the higher bound between initial
# conditions and prospective scenario (in 2020)
informal_nearfuture = np.fmax(
informal_risks_nearfuture / area_pixel,
informal_baseline)
# We also get area for high risk scenario retained in projections
high_proba = informal_risks_VERYHIGH.area + informal_risks_HIGH.area
# We consider some scenario for 2023 and correct for RDP construction
# (as informal predictions are not precise enough to account for such
# unavailable area)
informal_midshort_fut = np.fmin(
coeff_land_no_urban_edge,
np.fmax(
informal_nearfuture,
np.transpose(np.fmin(informal_risks_short.area, high_proba))
/ area_pixel
)
) - spline_land_RDP(12)
informal_midshort_fut[informal_midshort_fut < 0] = 0
# We do the same for 2025
informal_midlong_fut = np.fmin(
coeff_land_no_urban_edge,
np.fmax(
informal_midshort_fut,
np.transpose(np.fmin(informal_risks_medium.area, high_proba))
/ area_pixel
)
) - spline_land_RDP(14)
informal_midlong_fut[informal_midlong_fut < 0] = 0
# And again for 2030
informal_long_fut = np.fmin(
coeff_land_no_urban_edge,
np.fmax(
informal_midlong_fut,
np.transpose(np.fmin(informal_risks_long.area, high_proba))
/ area_pixel
)
) - spline_land_RDP(19)
informal_long_fut[informal_long_fut < 0] = 0
# Regression spline (with land constraints)
if options["informal_land_constrained"] == 0:
spline_land_informal = interp1d(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 12, 14, 19, 29],
np.transpose(
[informal_baseline, informal_baseline, informal_baseline,
informal_baseline, informal_baseline, informal_baseline,
informal_baseline, informal_baseline, informal_baseline,
informal_nearfuture, informal_midshort_fut,
informal_midlong_fut, informal_long_fut, informal_long_fut]
),
'linear'
)
elif options["informal_land_constrained"] == 1:
spline_land_informal = interp1d(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 12, 14, 19, 29],
np.transpose(
[informal_baseline, informal_baseline, informal_baseline,
informal_baseline, informal_baseline, informal_baseline,
informal_baseline, informal_baseline, informal_baseline,
informal_nearfuture, informal_nearfuture, informal_nearfuture,
informal_nearfuture, informal_nearfuture]
),
'linear'
)
# 7. Function output
return (spline_RDP, spline_estimate_RDP, spline_land_RDP,
spline_land_backyard, spline_land_informal,
spline_land_constraints, number_properties_RDP)
[docs]def import_coeff_land(spline_land_constraints, spline_land_backyard,
spline_land_informal, spline_land_RDP, param, t):
"""
Update land availability ratios for a given year.
This function updates the land availability regression splines to account
for non-constructible land (such as roads, open spaces, etc.). This is done
by reweighting the estimates with an ad hoc parameter ratio.
Parameters
----------
spline_land_constraints : interp1d
Linear interpolation for the grid-level overall land availability
(in %)
spline_land_backyard : interp1d
Linear interpolation for the grid-level land availability (in %)
for informal backyards over the years
spline_land_informal : interp1d
Linear interpolation for the grid-level land availability (in %)
for informal settlements over the years
spline_land_RDP : interp1d
Linear interpolation for the grid-level land availability (in %)
for formal subsidized housing over the years
param : dict
Dictionary of default parameters
t : int
Year (relative to baseline year set at 0) for which we want to
run the function
Returns
-------
coeff_land : ndarray(float64, ndim=2)
Updated land availability for each grid cell (24,014) and each
housing type (4: formal private, informal backyards, informal
settlements, formal subsidized)
"""
# NB: we consider area actually used for commercial purposes
# as "available" for residential construction
# Available private land is just defined as total available land minus
# land dedicated to other housing types
coeff_land_private = (spline_land_constraints(t)
- spline_land_backyard(t)
- spline_land_informal(t)
- spline_land_RDP(t)) * param["max_land_use"]
coeff_land_private[coeff_land_private < 0] = 0
coeff_land_backyard = (spline_land_backyard(t)
* param["max_land_use_backyard"])
coeff_land_RDP = spline_land_RDP(t) * param["max_land_use"]
coeff_land_settlement = (spline_land_informal(t)
* param["max_land_use_settlement"])
coeff_land = np.array([coeff_land_private, coeff_land_backyard,
coeff_land_settlement, coeff_land_RDP])
return coeff_land
[docs]def import_housing_limit(grid, param):
"""
Return maximum allowed housing supply in and out of historic city radius.
Parameters
----------
grid : DataFrame
Table yielding, for each grid cell (24,014), its x and y
(centroid) coordinates, and its distance (in km) to the city centre
param : dict
Dictionary of default parameters
Returns
-------
housing_limit . Series
Maximum housing supply (in m² per km²) in each grid cell (24,014)
"""
center_regulation = (grid["dist"] <= param["historic_radius"])
outside_regulation = (grid["dist"] > param["historic_radius"])
# We get the maximum amount of housing we can get per km² (hence the
# multiplier as pixel area is given in m²)
housing_limit = 1000000 * (
param["limit_height_center"] * center_regulation
+ param["limit_height_out"] * outside_regulation
)
return housing_limit
[docs]def import_init_floods_data(options, param, path_folder):
"""
Import raw flood data and damage functions.
More specifically, damage functions (taken from the literature) associate
to a given maximum flood depth level a fraction of capital destroyed,
depending on the type of capital considered. We focus here on housing
structures (whose value is determined endogenously) and housing contents
that are prone to flood destruction (calibrated ad hoc). We will later
associate building material types to housing types considered in the model.
Also note that fluvial flood maps are available both in a defended
(supposedly accounting for existing protection infrastructure) and
undefended version.
Parameters
----------
options : dict
Dictionary of default options
param : dict
Dictionary of default parameters
path_folder : str
Path towards the root data folder
Returns
-------
structural_damages_small_houses : interp1d
Linear interpolation for fraction of capital destroyed (small house
structures) over maximum flood depth (in m) in a given area,
from de Villiers et al., 2007
structural_damages_medium_houses : interp1d
Linear interpolation for fraction of capital destroyed (medium house
structures) over maximum flood depth (in m) in a given area,
from de Villiers et al., 2007
structural_damages_large_houses : interp1d
Linear interpolation for fraction of capital destroyed (large house
structures) over maximum flood depth (in m) in a given area,
from de Villiers et al., 2007
content_damages : interp1d
Linear interpolation for fraction of capital destroyed (house
contents) over maximum flood depth (in m) in a given area,
from de Villiers et al., 2007
structural_damages_type1 : interp1d
Linear interpolation for fraction of capital destroyed (type-1 house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (non-engineered buildings)
structural_damages_type2 : interp1d
Linear interpolation for fraction of capital destroyed (type-2 house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (wooden buildings)
structural_damages_type3a : interp1d
Linear interpolation for fraction of capital destroyed (type-3a house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (one-floor unreinforced masonry/concrete
buildings)
structural_damages_type3b : interp1d
Linear interpolation for fraction of capital destroyed (type-3b house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (two-floor unreinforced masonry/concrete
buildings)
structural_damages_type4a : interp1d
Linear interpolation for fraction of capital destroyed (type-4a house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (one-floor reinforced masonry/concrete
and steel buildings)
structural_damages_type4b : interp1d
Linear interpolation for fraction of capital destroyed (type-4b house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (two-floor reinforced masonry/concrete
and steel buildings)
d_fluvial : dict
Dictionary of data frames yielding fluvial flood maps (maximum flood
depth + fraction of flood-prone area for each grid cell) for each
available return period
d_pluvial : dict
Dictionary of data frames yielding pluvial flood maps (maximum flood
depth + fraction of flood-prone area for each grid cell) for each
available return period
d_coastal : dict
Dictionary of data frames yielding coastal flood maps (maximum flood
depth + fraction of flood-prone area for each grid cell) for each
available return period
"""
# Import floods data
if options["defended"] == 1:
fluvial_floods = ['FD_5yr', 'FD_10yr', 'FD_20yr', 'FD_50yr', 'FD_75yr',
'FD_100yr', 'FD_200yr', 'FD_250yr', 'FD_500yr',
'FD_1000yr']
elif options["defended"] == 0:
fluvial_floods = ['FU_5yr', 'FU_10yr', 'FU_20yr', 'FU_50yr', 'FU_75yr',
'FU_100yr', 'FU_200yr', 'FU_250yr', 'FU_500yr',
'FU_1000yr']
pluvial_floods = ['P_5yr', 'P_10yr', 'P_20yr', 'P_50yr', 'P_75yr',
'P_100yr', 'P_200yr', 'P_250yr', 'P_500yr', 'P_1000yr']
name = 'C_' + options["dem"] + '_' + str(options["climate_change"])
# Coastal flood maps are extracted from DELTARES global flood maps that
# use GTSMip6 water levels as inputs (Muis et al., 2020) for three distinct
# DEMs. We only consider the two of them that have a fine resolution of
# 90m / 3'' on a par with FATHOM DATA.
coastal_floods = [name + '_0000', name + '_0002', name + '_0005',
name + '_0010', name + '_0025', name + '_0050',
name + '_0100', name + '_0250']
path_data = path_folder + "flood_maps/"
# NB: FATHOM data has already been pre-processed to fit the CoCT's grid of
# analysis in previous work. The code used to pre-process DELTARES data
# is available under the _flood_processing folder
d_pluvial = {}
d_fluvial = {}
d_coastal = {}
for flood in fluvial_floods:
print(flood)
d_fluvial[flood] = np.squeeze(
pd.read_excel(path_data + flood + ".xlsx")
)
for flood in pluvial_floods:
print(flood)
d_pluvial[flood] = np.squeeze(
pd.read_excel(path_data + flood + ".xlsx")
)
for flood in coastal_floods:
print(flood)
d_coastal[flood] = np.squeeze(
pd.read_excel(path_data + flood + ".xlsx")
)
# Damage functions give damage as a % of good considered, as a function
# of maximum flood depth in meters
# Depth-damage functions (from de Villiers, 2007: ratios from table 3, and
# table 4)
structural_damages_small_houses = interp1d(
[0, 0.1, 0.6, 1.2, 2.4, 6, 10],
[0, 0.0479, 0.1317, 0.1795, 0.3591, 1, 1]
)
structural_damages_medium_houses = interp1d(
[0, 0.1, 0.6, 1.2, 2.4, 6, 10],
[0, 0.0830, 0.2273, 0.3083, 0.6166, 1, 1]
)
structural_damages_large_houses = interp1d(
[0, 0.1, 0.6, 1.2, 2.4, 6, 10],
[0, 0.0799, 0.2198, 0.2997, 0.5994, 1, 1]
)
content_damages = interp1d(
[0, 0.1, 0.3, 0.6, 1.2, 1.5, 2.4, 10],
[0, 0.06, 0.15, 0.35, 0.77, 0.95, 1, 1]
)
# Depth-damage functions (from Englhardt, 2019: figure 2)
structural_damages_type1 = interp1d(
[0, 0.5, 1, 1.25, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 10],
[0, 0.5, 0.9, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
)
structural_damages_type2 = interp1d(
[0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 10],
[0, 0.45, 0.65, 0.82, 0.95, 1, 1, 1, 1, 1, 1, 1]
)
structural_damages_type3a = interp1d(
[0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 10],
[0, 0.4, 0.55, 0.7, 0.78, 0.81, 0.81, 0.81, 0.81, 0.81, 0.81, 0.81]
)
structural_damages_type3b = interp1d(
[0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 10],
[0, 0.25, 0.4, 0.48, 0.58, 0.62, 0.65, 0.75, 0.78, 0.8, 0.81, 0.81]
)
structural_damages_type4a = interp1d(
[0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 10],
[0, 0.31, 0.45, 0.55, 0.62, 0.65, 0.65, 0.65, 0.65, 0.65, 0.65, 0.65]
)
structural_damages_type4b = interp1d(
[0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 10],
[0, 0.2, 0.3, 0.4, 0.45, 0.5, 0.55, 0.6, 0.62, 0.64, 0.65, 0.65]
)
# Previous WB report uses the following for type4a/b equivalent (+ content)
# It seems that Englhardt already incorporates the cap for maximum
# destruction in his estimates
# ([0, 0.5, 1, 1.5, 2, 3, 4, 5, 6],
# [0, 0.22, 0.38, 0.53, 0.64, 0.82, 0.90, 0.96, 1])
return (structural_damages_small_houses, structural_damages_medium_houses,
structural_damages_large_houses, content_damages,
structural_damages_type1, structural_damages_type2,
structural_damages_type3a, structural_damages_type3b,
structural_damages_type4a, structural_damages_type4b,
d_fluvial, d_pluvial, d_coastal)
[docs]def compute_fraction_capital_destroyed(d, type_flood, damage_function,
housing_type, options):
"""
Compute expected fraction of capital destroyed by floods.
To go from discrete to continuous estimates of flood damages, we linearly
integrate between available return periods (understood as an annual event
inverse probability). This function allows us to do so for a given flood
type, housing type, and damage function.
Parameters
----------
d : dict
Dictionary of data frames yielding flood maps (maximum flood
depth + fraction of flood-prone area for each grid cell) for each
available return period, for a given flood type
type_flood : str
Code for flood type considered (FU for fluvial undefended, FD for
fluvial defended, P for pluvial, C for coastal)
damage_function : interp1d
Linear interpolation for fraction of capital destroyed over maximum
flood depth (in m) in a given area, for a given capital type
housing_type : str
Housing type considered (to apply some ad hoc corrections).
Should be set to "formal", "subsidized", "backyard", or "informal".
options : dict
Dictionary of default options
Returns
-------
float64
Expected fraction of capital destroyed
"""
if type_flood == 'P' or type_flood == 'FD' or type_flood == 'FU':
# This defines a probability rule (summing to 1) for each time interval
# defined in FATHOM (the more distant, the less likely)
interval0 = 1 - (1/5)
interval1 = (1/5) - (1/10)
interval2 = (1/10) - (1/20)
interval3 = (1/20) - (1/50)
interval4 = (1/50) - (1/75)
interval5 = (1/75) - (1/100)
interval6 = (1/100) - (1/200)
interval7 = (1/200) - (1/250)
interval8 = (1/250) - (1/500)
interval9 = (1/500) - (1/1000)
interval10 = (1/1000)
if options["climate_change"] == 1:
# We increase the likelihood of flood risks based upon given param
interval0 = 1 - (1/5 * options["risk_increase"])
interval1 = ((1/5 * options["risk_increase"])
- (1/10 * options["risk_increase"]))
interval2 = ((1/10 * options["risk_increase"])
- (1/20 * options["risk_increase"]))
interval3 = ((1/20 * options["risk_increase"])
- (1/50 * options["risk_increase"]))
interval4 = ((1/50 * options["risk_increase"])
- (1/75 * options["risk_increase"]))
interval5 = ((1/75 * options["risk_increase"])
- (1/100 * options["risk_increase"]))
interval6 = ((1/100 * options["risk_increase"])
- (1/200 * options["risk_increase"]))
interval7 = ((1/200 * options["risk_increase"])
- (1/250 * options["risk_increase"]))
interval8 = ((1/250 * options["risk_increase"])
- (1/500 * options["risk_increase"]))
interval9 = ((1/500 * options["risk_increase"])
- (1/1000 * options["risk_increase"]))
interval10 = (1/1000 * options["risk_increase"])
# We consider that formal housing is not vulnerable to pluvial floods
# over medium run, and that RDP and backyard are not over short run.
# This is based on CCT Minimum Standards for Stormwater Design 2014
# (p.37) and Govender, 2011 (fig.8 and p.30)
if options["correct_pluvial"] == 1:
if ((type_flood == 'P') & (housing_type == 'formal')):
d[type_flood + '_5yr'].prop_flood_prone = np.zeros(24014)
d[type_flood + '_10yr'].prop_flood_prone = np.zeros(24014)
d[type_flood + '_20yr'].prop_flood_prone = np.zeros(24014)
d[type_flood + '_5yr'].flood_depth = np.zeros(24014)
d[type_flood + '_10yr'].flood_depth = np.zeros(24014)
d[type_flood + '_20yr'].flood_depth = np.zeros(24014)
elif ((type_flood == 'P')
& ((housing_type == 'subsidized')
| (housing_type == 'backyard'))):
d[type_flood + '_5yr'].prop_flood_prone = np.zeros(24014)
d[type_flood + '_10yr'].prop_flood_prone = np.zeros(24014)
d[type_flood + '_5yr'].flood_depth = np.zeros(24014)
d[type_flood + '_10yr'].flood_depth = np.zeros(24014)
# Damage scenarios are incremented using damage functions multiplied by
# flood-prone area (yields pixel share of destructed area), so as to
# define damage intervals to be used in final computation
# NB: We assume that no area is at risk at baseline year
damages0 = (d[type_flood + '_5yr'].prop_flood_prone
* damage_function(d[type_flood + '_5yr'].flood_depth))
damages1 = ((d[type_flood + '_5yr'].prop_flood_prone
* damage_function(d[type_flood + '_5yr'].flood_depth))
+ (d[type_flood + '_10yr'].prop_flood_prone
* damage_function(d[type_flood + '_10yr'].flood_depth)))
damages2 = ((d[type_flood + '_10yr'].prop_flood_prone
* damage_function(d[type_flood + '_10yr'].flood_depth))
+ (d[type_flood + '_20yr'].prop_flood_prone
* damage_function(d[type_flood + '_20yr'].flood_depth)))
damages3 = ((d[type_flood + '_20yr'].prop_flood_prone
* damage_function(d[type_flood + '_20yr'].flood_depth))
+ (d[type_flood + '_50yr'].prop_flood_prone
* damage_function(d[type_flood + '_50yr'].flood_depth)))
damages4 = ((d[type_flood + '_50yr'].prop_flood_prone
* damage_function(d[type_flood + '_50yr'].flood_depth))
+ (d[type_flood + '_75yr'].prop_flood_prone
* damage_function(d[type_flood + '_75yr'].flood_depth)))
damages5 = ((d[type_flood + '_75yr'].prop_flood_prone
* damage_function(d[type_flood + '_75yr'].flood_depth))
+ (d[type_flood + '_100yr'].prop_flood_prone
* damage_function(d[type_flood + '_100yr'].flood_depth))
)
damages6 = ((d[type_flood + '_100yr'].prop_flood_prone
* damage_function(d[type_flood + '_100yr'].flood_depth))
+ (d[type_flood + '_200yr'].prop_flood_prone
* damage_function(d[type_flood + '_200yr'].flood_depth))
)
damages7 = ((d[type_flood + '_200yr'].prop_flood_prone
* damage_function(d[type_flood + '_200yr'].flood_depth))
+ (d[type_flood + '_250yr'].prop_flood_prone
* damage_function(d[type_flood + '_250yr'].flood_depth))
)
damages8 = ((d[type_flood + '_250yr'].prop_flood_prone
* damage_function(d[type_flood + '_250yr'].flood_depth))
+ (d[type_flood + '_500yr'].prop_flood_prone
* damage_function(d[type_flood + '_500yr'].flood_depth))
)
damages9 = ((d[type_flood + '_500yr'].prop_flood_prone
* damage_function(d[type_flood + '_500yr'].flood_depth))
+ (d[type_flood + '_1000yr'].prop_flood_prone
* damage_function(d[type_flood + '_1000yr'].flood_depth)
))
# We assume that value stays the same when time goes to infinity
damages10 = ((d[type_flood + '_1000yr'].prop_flood_prone
* damage_function(d[type_flood + '_1000yr'].flood_depth))
+ (d[type_flood + '_1000yr'].prop_flood_prone
* damage_function(d[type_flood + '_1000yr'].flood_depth
)))
# The formula for expected fraction of capital destroyed is given by
# the integral of damage according to time (or rather, inverse
# probability).
# Assuming that damages increase linearly with time (or inverse
# probability), we can approximate this area as a sum of rectangles
# defined for each of our intervals: this yields the following formula
# NB: for more graphical intuition, see
# https://storymaps.arcgis.com/stories/7878c89c592e4a78b45f03b4b696ccac
return (0.5
* ((interval0 * damages0) + (interval1 * damages1)
+ (interval2 * damages2) + (interval3 * damages3)
+ (interval4 * damages4) + (interval5 * damages5)
+ (interval6 * damages6) + (interval7 * damages7)
+ (interval8 * damages8) + (interval9 * damages9)
+ (interval10 * damages10)))
elif type_flood == 'C':
interval0 = 1 - (1/2)
interval1 = (1/2) - (1/5)
interval2 = (1/5) - (1/10)
interval3 = (1/10) - (1/25)
interval4 = (1/25) - (1/50)
interval5 = (1/50) - (1/100)
interval6 = (1/100) - (1/250)
interval7 = (1/250)
# NB: note that we do not use the same intervals for coastal as for
# pluvial/fluvial since we do not have the same return periods available
# in DELTARES and FATHOM data
name = (type_flood + '_' + options["dem"] + '_'
+ str(options["climate_change"]))
# Here, we do have some inundation estimates at baseline year
damages0 = ((d[name + '_0000'].prop_flood_prone
* damage_function(d[name + '_0000'].flood_depth))
+ (d[name + '_0002'].prop_flood_prone
* damage_function(d[name + '_0002'].flood_depth)))
damages1 = ((d[name + '_0002'].prop_flood_prone
* damage_function(d[name + '_0002'].flood_depth))
+ (d[name + '_0005'].prop_flood_prone
* damage_function(d[name + '_0005'].flood_depth)))
damages2 = ((d[name + '_0005'].prop_flood_prone
* damage_function(d[name + '_0005'].flood_depth))
+ (d[name + '_0010'].prop_flood_prone
* damage_function(d[name + '_0010'].flood_depth)))
damages3 = ((d[name + '_0010'].prop_flood_prone
* damage_function(d[name + '_0010'].flood_depth))
+ (d[name + '_0025'].prop_flood_prone
* damage_function(d[name + '_0025'].flood_depth)))
damages4 = ((d[name + '_0025'].prop_flood_prone
* damage_function(d[name + '_0025'].flood_depth))
+ (d[name + '_0050'].prop_flood_prone
* damage_function(d[name + '_0050'].flood_depth)))
damages5 = ((d[name + '_0050'].prop_flood_prone
* damage_function(d[name + '_0050'].flood_depth))
+ (d[name + '_0100'].prop_flood_prone
* damage_function(d[name + '_0100'].flood_depth)))
damages6 = ((d[name + '_0100'].prop_flood_prone
* damage_function(d[name + '_0100'].flood_depth))
+ (d[name + '_0250'].prop_flood_prone
* damage_function(d[name + '_0250'].flood_depth)))
damages7 = ((d[name + '_0250'].prop_flood_prone
* damage_function(d[name + '_0250'].flood_depth))
+ (d[name + '_0250'].prop_flood_prone
* damage_function(d[name + '_0250'].flood_depth)))
return (0.5
* ((interval0 * damages0) + (interval1 * damages1)
+ (interval2 * damages2) + (interval3 * damages3)
+ (interval4 * damages4) + (interval5 * damages5)
+ (interval6 * damages6) + (interval7 * damages7)))
[docs]def import_full_floods_data(options, param, path_folder):
"""
Compute expected fraction of capital destroyed by floods across space.
This function applies theoretical formulas to flood maps to get the
theoretical expected fraction of capital destroyed across space
(should households choose to live there). To do so, it leverages the
import_init_floods_data and compute_fraction_capital_destroyed functions.
Note that we consider the maximum flood depth across flood maps when they
overlap each other, as there might be some double counting between pluvial
and fluvial flood risks, and floods just spill over across space instead of
piling up (bath-tub model).
Parameters
----------
options : dict
Dictionary of default options
param : dict
Dictionary of default parameters
path_folder : str
Path towards the root data folder
Returns
-------
fraction_capital_destroyed : DataFrame
Data frame of expected fractions of capital destroyed, for housing
structures and contents in different housing types, in each
grid cell (24,014)
structural_damages_small_houses : interp1d
Linear interpolation for fraction of capital destroyed (small house
structures) over maximum flood depth (in m) in a given area,
from de Villiers et al., 2007
structural_damages_medium_houses : interp1d
Linear interpolation for fraction of capital destroyed (medium house
structures) over maximum flood depth (in m) in a given area,
from de Villiers et al., 2007
structural_damages_large_houses : interp1d
Linear interpolation for fraction of capital destroyed (large house
structures) over maximum flood depth (in m) in a given area,
from de Villiers et al., 2007
content_damages : interp1d
Linear interpolation for fraction of capital destroyed (house
contents) over maximum flood depth (in m) in a given area,
from de Villiers et al., 2007
structural_damages_type1 : interp1d
Linear interpolation for fraction of capital destroyed (type-1 house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (non-engineered buildings)
structural_damages_type2 : interp1d
Linear interpolation for fraction of capital destroyed (type-2 house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (wooden buildings)
structural_damages_type3a : interp1d
Linear interpolation for fraction of capital destroyed (type-3a house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (one-floor unreinforced masonry/concrete
buildings)
structural_damages_type3b : interp1d
Linear interpolation for fraction of capital destroyed (type-3b house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (two-floor unreinforced masonry/concrete
buildings)
structural_damages_type4a : interp1d
Linear interpolation for fraction of capital destroyed (type-4a house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (one-floor reinforced masonry/concrete
and steel buildings)
structural_damages_type4b : interp1d
Linear interpolation for fraction of capital destroyed (type-4b house
structures) over maximum flood depth (in m) in a given area,
from de Englhardt et al., 2019 (two-floor reinforced masonry/concrete
and steel buildings)
"""
fraction_capital_destroyed = pd.DataFrame()
(structural_damages_small_houses, structural_damages_medium_houses,
structural_damages_large_houses, content_damages,
structural_damages_type1, structural_damages_type2,
structural_damages_type3a, structural_damages_type3b,
structural_damages_type4a, structural_damages_type4b,
d_fluvial, d_pluvial, d_coastal) = import_init_floods_data(
options, param, path_folder)
if options["defended"] == 1:
fluvialtype = 'FD'
elif options["defended"] == 0:
fluvialtype = 'FU'
# When two flood types overlap (can happen with pluvial and fluvial),
# we keep the maximum among the flood depths to avoid double counting
if options["pluvial"] == 0 and options["coastal"] == 0:
print("Contents in private formal")
(fraction_capital_destroyed["contents_formal"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'formal', options)
print("Contents in informal settlements")
(fraction_capital_destroyed["contents_informal"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'informal', options)
print("Contents in (any) backyard")
(fraction_capital_destroyed["contents_backyard"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'backyard', options)
print("Contents in formal subsidized")
(fraction_capital_destroyed["contents_subsidized"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'subsidized', options)
print("Private formal structures (one floor)")
(fraction_capital_destroyed["structure_formal_1"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4a, 'formal',
options)
print("Private formal structures (two floors)")
(fraction_capital_destroyed["structure_formal_2"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4b, 'formal',
options)
print("Formal subsidized structures (one floor)")
(fraction_capital_destroyed["structure_subsidized_1"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4a, 'subsidized',
options)
print("Formal subsidized structures (two floors)")
(fraction_capital_destroyed["structure_subsidized_2"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4b, 'subsidized',
options)
print("Informal settlement structures")
(fraction_capital_destroyed["structure_informal_settlements"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type2, 'informal',
options)
print("Informal backyard structures")
(fraction_capital_destroyed["structure_informal_backyards"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type2, 'backyard',
options)
print("Formal backyard structures (one floor)")
(fraction_capital_destroyed["structure_formal_backyards"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type3a, 'backyard',
options)
print("Formal backyard structures (two floors)")
(fraction_capital_destroyed["structure_formal_backyards"]
) = compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type3b, 'backyard',
options)
elif options["pluvial"] == 1 and options["coastal"] == 0:
print("Contents in private formal")
(fraction_capital_destroyed["contents_formal"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'formal', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', content_damages, 'formal', options))
print("Contents in informal settlements")
(fraction_capital_destroyed["contents_informal"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'informal', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', content_damages, 'informal', options))
print("Contents in (any) backyard")
(fraction_capital_destroyed["contents_backyard"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'backyard', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', content_damages, 'backyard', options))
print("Contents in formal subsidized")
(fraction_capital_destroyed["contents_subsidized"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'subsidized', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', content_damages, 'subsidized', options))
print("Private formal structures (one floor)")
(fraction_capital_destroyed["structure_formal_1"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4a, 'formal',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type4a, 'formal', options))
print("Private formal structures (two floors)")
(fraction_capital_destroyed["structure_formal_2"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4b, 'formal',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type4b, 'formal', options))
print("Formal subsidized structures (one floor)")
(fraction_capital_destroyed["structure_subsidized_1"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4a, 'subsidized',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type4a, 'subsidized',
options))
print("Formal subsidized structures (two floors)")
(fraction_capital_destroyed["structure_subsidized_2"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4b, 'subsidized',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type4b, 'subsidized',
options))
print("Informal settlement structures")
(fraction_capital_destroyed["structure_informal_settlements"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type2, 'informal',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type2, 'informal',
options))
print("Informal backyard structures")
(fraction_capital_destroyed["structure_informal_backyards"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type2, 'backyard',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type2, 'backyard',
options))
print("Formal backyard structures (one floor)")
(fraction_capital_destroyed["structure_formal_backyards"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type3a, 'backyard',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type3a, 'backyard',
options))
print("Formal backyard structures (two floors)")
(fraction_capital_destroyed["structure_formal_backyards"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type3b, 'backyard',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type3b, 'backyard',
options))
elif options["pluvial"] == 0 and options["coastal"] == 1:
print("Contents in private formal")
(fraction_capital_destroyed["contents_formal"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'formal', options),
compute_fraction_capital_destroyed(
d_coastal, 'C', content_damages, 'formal', options))
print("Contents in informal settlements")
(fraction_capital_destroyed["contents_informal"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'informal', options),
compute_fraction_capital_destroyed(
d_coastal, 'C', content_damages, 'informal', options))
print("Contents in (any) backyard")
(fraction_capital_destroyed["contents_backyard"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'backyard', options),
compute_fraction_capital_destroyed(
d_coastal, 'C', content_damages, 'backyard', options))
print("Contents in formal subsidized")
(fraction_capital_destroyed["contents_subsidized"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'subsidized', options),
compute_fraction_capital_destroyed(
d_coastal, 'C', content_damages, 'subsidized', options))
print("Private formal structures (one floor)")
(fraction_capital_destroyed["structure_formal_1"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4a, 'formal',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type4a, 'formal', options))
print("Private formal structures (two floors)")
(fraction_capital_destroyed["structure_formal_2"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4b, 'formal',
options),
compute_fraction_capital_destroyed(
d_coastal, 'P', structural_damages_type4b, 'formal', options))
print("Formal subsidized structures (one floor)")
(fraction_capital_destroyed["structure_subsidized_1"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4a, 'subsidized',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type4a, 'subsidized',
options))
print("Formal subsidized structures (two floors)")
(fraction_capital_destroyed["structure_subsidized_2"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4b, 'subsidized',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type4b, 'subsidized',
options))
print("Informal settlement structures")
(fraction_capital_destroyed["structure_informal_settlements"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type2, 'informal',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type2, 'informal',
options))
print("Informal backyard structures")
(fraction_capital_destroyed["structure_informal_backyards"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type2, 'backyard',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type2, 'backyard',
options))
print("Formal backyard structures (one floor)")
(fraction_capital_destroyed["structure_formal_backyards"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type3a, 'backyard',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type3a, 'backyard',
options))
print("Formal backyard structures (two floors)")
(fraction_capital_destroyed["structure_formal_backyards"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type3b, 'backyard',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type3b, 'backyard',
options))
elif options["pluvial"] == 1 and options["coastal"] == 1:
print("Contents in private formal")
(fraction_capital_destroyed["contents_formal"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'formal', options),
compute_fraction_capital_destroyed(
d_coastal, 'C', content_damages, 'formal', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', content_damages, 'formal', options))
print("Contents in informal settlements")
(fraction_capital_destroyed["contents_informal"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'informal', options),
compute_fraction_capital_destroyed(
d_coastal, 'C', content_damages, 'informal', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', content_damages, 'informal', options))
print("Contents in (any) backyard")
(fraction_capital_destroyed["contents_backyard"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'backyard', options),
compute_fraction_capital_destroyed(
d_coastal, 'C', content_damages, 'backyard', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', content_damages, 'backyard', options))
print("Contents in formal subsidized")
(fraction_capital_destroyed["contents_subsidized"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, content_damages, 'subsidized', options),
compute_fraction_capital_destroyed(
d_coastal, 'C', content_damages, 'subsidized', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', content_damages, 'subsidized', options))
print("Private formal structures (one floor)")
(fraction_capital_destroyed["structure_formal_1"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4a, 'formal',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type4a, 'formal', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type4a, 'formal', options))
print("Private formal structures (two floors)")
(fraction_capital_destroyed["structure_formal_2"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4b, 'formal',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type4b, 'formal', options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type4b, 'formal', options))
print("Formal subsidized structures (one floor)")
(fraction_capital_destroyed["structure_subsidized_1"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4a, 'subsidized',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type4a, 'subsidized',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type4a, 'subsidized',
options))
print("Formal subsidized structures (two floors)")
(fraction_capital_destroyed["structure_subsidized_2"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type4b, 'subsidized',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type4b, 'subsidized',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type4b, 'subsidized',
options))
print("Informal settlement structures")
(fraction_capital_destroyed["structure_informal_settlements"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type2, 'informal',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type2, 'informal',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type2, 'informal',
options))
print("Informal backyard structures")
(fraction_capital_destroyed["structure_informal_backyards"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type2, 'backyard',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type2, 'backyard',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type2, 'backyard',
options))
print("Formal backyard structures (one floor)")
(fraction_capital_destroyed["structure_formal_backyards"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type3a, 'backyard',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type3a, 'backyard',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type3a, 'backyard',
options))
print("Formal backyard structures (two floors)")
(fraction_capital_destroyed["structure_formal_backyards"]
) = np.maximum(compute_fraction_capital_destroyed(
d_fluvial, fluvialtype, structural_damages_type3b, 'backyard',
options),
compute_fraction_capital_destroyed(
d_coastal, 'C', structural_damages_type3b, 'backyard',
options),
compute_fraction_capital_destroyed(
d_pluvial, 'P', structural_damages_type3b, 'backyard',
options))
# We take a weighted average for structures in bricks and shacks among
# all backyard structures in case we include both in the model.
# To do so, we rely on SAL housing type data.
sal_data = pd.read_excel(
path_folder
+ "housing_types_sal_analysis.xlsx",
header=6, nrows=5339)
sal_data["backyard_formal"] = sal_data["House/flat/room in backyard"]
sal_data["backyard_informal"] = sal_data[
"Informal dwelling (shack; in backyard)"]
total_backyard_formal = sal_data["backyard_formal"].sum()
total_backyard_informal = sal_data["backyard_informal"].sum()
(fraction_capital_destroyed["structure_backyards"]
) = ((total_backyard_formal
* fraction_capital_destroyed["structure_formal_backyards"])
+ (total_backyard_informal
* fraction_capital_destroyed["structure_informal_backyards"])
) / (total_backyard_formal + total_backyard_informal)
return (fraction_capital_destroyed, structural_damages_small_houses,
structural_damages_medium_houses, structural_damages_large_houses,
content_damages, structural_damages_type1,
structural_damages_type2, structural_damages_type3a,
structural_damages_type3b, structural_damages_type4a,
structural_damages_type4b)
[docs]def import_transport_data(grid, param, yearTraffic,
households_per_income_class, average_income,
spline_inflation,
spline_fuel,
spline_population_income_distribution,
spline_income_distribution,
path_precalc_inp, path_precalc_transp, dim, options):
"""
Run commuting choice model.
This function runs the theoretical commuting choice model to
recover key transport-related intermediate outputs. More specifically,
it imports transport costs (from data) and (calibrated) incomes (see
calibration.sub.compute_income), then computes the modal shares for each
commuting pair, the probability distribution of such commuting pairs, the
expected income net of commuting costs per residential location, and the
associated average incomes.
Parameters
----------
grid : DataFrame
Table yielding, for each grid cell (24,014), its x and y
(centroid) coordinates, and its distance (in km) to the city centre
param : dict
Dictionary of default parameters
yearTraffic : int
Year (relative to baseline year set at 0) for which we want to
run the function
households_per_income_class : ndarray(float64)
Exogenous total number of households per income group (excluding people
out of employment, for 4 groups)
average_income : ndarray(float64)
Average median income for each income group in the model (4)
spline_inflation : interp1d
Linear interpolation for inflation rate (in base 100 relative to
baseline year) over the years (baseline year set at 0)
spline_fuel : interp1d
Linear interpolation for fuel price (in rands per km)
over the years (baseline year set at 0)
spline_population_income_distribution : interp1d
Linear interpolation for total population per income group in the data
(12) over the years (baseline year set at 0)
spline_income_distribution : interp1d
Linear interpolation for median annual income (in rands) per income
group in the data (12) over the years (baseline year set at 0)
path_precalc_inp : str
Path for precalcuted input data (calibrated parameters)
path_precalc_transp : str
Path for precalcuted transport inputs (intermediate outputs from
commuting choice model)
dim : str
Geographic level of analysis at which we want to run the commuting
choice model: should be set to "GRID" or "SP"
options : dict
Dictionary of default options
Returns
-------
incomeNetOfCommuting : ndarray(float64, ndim=2)
Expected annual income net of commuting costs (in rands, for
one household), for each geographic unit, by income group (4)
modalShares : ndarray(float64, ndim=4)
Share (from 0 to 1) of each transport mode for each income group
(4) and each commuting pair (185 selected job centers + chosen
geographic units for residential locations)
ODflows : ndarray(float64, ndim=3)
Probability to work in a given selected job center (185, out of
a wider pool of transport zones), for a given income group (4)
and a given residential location (depending on geographic unit
chosen)
averageIncome : ndarray(float64, ndim=2)
Average annual income for each geographic unit and each income group
(4), for one household
"""
# Import (monetary and time) transport costs
(timeOutput, distanceOutput, monetaryCost, costTime
) = calcmp.import_transport_costs(
grid, param, yearTraffic, households_per_income_class,
spline_inflation, spline_fuel, spline_population_income_distribution,
spline_income_distribution, path_precalc_inp, path_precalc_transp,
dim, options)
param_lambda = param["lambda"].squeeze()
incomeNetOfCommuting = np.zeros(
(param["nb_of_income_classes"],
timeOutput[:, :, 0].shape[1])
)
averageIncome = np.zeros(
(param["nb_of_income_classes"],
timeOutput[:, :, 0].shape[1])
)
modalShares = np.zeros(
(185, timeOutput[:, :, 0].shape[1], 5,
param["nb_of_income_classes"])
)
ODflows = np.zeros(
(185, timeOutput[:, :, 0].shape[1],
param["nb_of_income_classes"])
)
# Update average income and number of households per income group
# for considered year
incomeGroup, households_per_income_class = eqdyn.compute_average_income(
spline_population_income_distribution, spline_income_distribution,
param, yearTraffic)
# We import expected income associated with each income center and income
# group (from calibration)
if options["load_precal_param"] == 1:
income_centers_init = scipy.io.loadmat(
path_precalc_inp + 'incomeCentersKeep.mat')['incomeCentersKeep']
elif options["load_precal_param"] == 0:
income_centers_init = np.load(
path_precalc_inp + 'incomeCentersKeep.npy')
# income_centers_init[income_centers_init == -np.inf] = np.nan
# We correct the level of incomes per job center when overall average
# income per income group changes with time (only used as an initial
# value for the algorithm)
incomeCenters = income_centers_init * incomeGroup / average_income
# Switch to hourly
# NB: costTime is already in hourly format
annualToHourly = 1 / (8*20*12)
monetaryCost = monetaryCost * annualToHourly
incomeCenters = incomeCenters * annualToHourly
for j in range(0, param["nb_of_income_classes"]):
# Household size varies with income group / transport costs
householdSize = param["household_size"][j]
if options["load_precal_param"] == 1:
whichCenters = incomeCenters[:, j] > -100000
elif options["load_precal_param"] == 0:
whichCenters = ~np.isnan(incomeCenters[:, j])
incomeCentersGroup = incomeCenters[whichCenters, j]
# We compute transport costs for each mode and per chosen mode
# (cost per hour)
(transportCostModes, transportCost, _, valueMax, minIncome
) = calcmp.compute_ODflows(
householdSize, monetaryCost, costTime, incomeCentersGroup,
whichCenters, param_lambda, options)
# NB: the value returned for ODflows would in fact correspond to
# ODflows[whichCenters, :, j], which we define explictly below
# Modal shares: this comes from the multinomial model resulting from
# extreme value theory with a Gumbel distribution
# (generalized EV type-I), see math appendix
# if options["prevent_exp_overflow"] == 1:
# modalShares[whichCenters, :, :, j] = (np.exp(
# - param_lambda * transportCostModes + valueMax[:, :, None])
# / np.nansum(np.exp(- param_lambda * transportCostModes
# + valueMax[:, :, None]), 2)[:, :, None]
# )
modalShares[whichCenters, :, :, j] = (
np.exp(- param_lambda * transportCostModes)
/ np.nansum(
np.exp(- param_lambda * transportCostModes), 2)[:, :, None]
)
# OD flows: corresponds to the probability, for a given household of
# income group i and living in x, to work in a job centre c,
# see math appendix
# NB: here, we also need to divide by the scale factor (typo in
# Pfeiffer et al.)
if options["prevent_exp_overflow"] == 1:
ODflows[whichCenters, :, j] = (
np.exp(param_lambda
* (incomeCentersGroup[:, None] - transportCost)
- minIncome)
/ np.nansum(
np.exp(param_lambda
* (incomeCentersGroup[:, None] - transportCost)
- minIncome),
0)[None, :]
)
elif options["prevent_exp_overflow"] == 0:
ODflows[whichCenters, :, j] = (
np.exp(
param_lambda
* (incomeCentersGroup[:, None] - transportCost))
/ np.nansum(
np.exp(param_lambda
* (incomeCentersGroup[:, None] - transportCost)),
0)[None, :]
)
# Expected income net of commuting costs
# NB: comments are a test for a log-sum specification that we dropped.
# Instead, we use a weighted average sum, as is standard in the
# literature (see Ahlfeldt et al., 2015)
# if options["prevent_exp_overflow"] == 1:
# incomeNetOfCommuting[j, :] = (
# 1/param_lambda
# * (np.log(np.nansum(np.exp(
# param_lambda
# * (incomeCentersGroup[:, None] - transportCost)
# - minIncome), 0))
# + minIncome)
# )
# elif options["prevent_exp_overflow"] == 0:
# incomeNetOfCommuting[j, :] = (
# 1/param_lambda
# * np.log(np.nansum(np.exp(
# param_lambda
# * (incomeCentersGroup[:, None] - transportCost)),
# 0))
# )
incomeNetOfCommuting[j, :] = np.nansum(
ODflows[whichCenters, :, j]
* (incomeCentersGroup[:, None] - transportCost), 0)
# Average income (not net of commuting costs) earned per worker
# NB: here, we just take the weighted average as there is no error term
# Note that this should take unemployment into account
averageIncome[j, :] = np.nansum(
ODflows[whichCenters, :, j] * incomeCentersGroup[:, None], 0)
# We go back to yearly format before saving results
incomeNetOfCommuting = incomeNetOfCommuting / annualToHourly
averageIncome = averageIncome / annualToHourly
np.save(path_precalc_transp + dim + "_averageIncome_" + str(yearTraffic),
averageIncome)
np.save(path_precalc_transp + dim + "_incomeNetOfCommuting_"
+ str(yearTraffic), incomeNetOfCommuting)
np.save(path_precalc_transp + dim + "_modalShares_" + str(yearTraffic),
modalShares)
np.save(path_precalc_transp + dim + "_ODflows_" + str(yearTraffic),
ODflows)
return incomeNetOfCommuting, modalShares, ODflows, averageIncome
[docs]def import_sal_data(grid, path_folder, path_data, housing_type_data):
"""
Import SAL data for population density by housing type.
This is used for validation as Small-Area-Level estimates are more
precise than Small-Place level estimates. However, they only yield
the distribution across housing types, and not income groups.
Parameters
----------
grid : DataFrame
Table yielding, for each grid cell (24,014), its x and y
(centroid) coordinates, and its distance (in km) to the city centre
path_folder : str
Path towards the root data folder
path_data : str
Path towards data used in the model
housing_type_data : ndarray(int64)
Exogenous number of households per housing type (4: formal private,
informal backyards, informal settlements, formal subsidized), from
Small-Area-Level data
Returns
-------
housing_types_grid_sal : DataFrame
Table yielding the number of dwellings for 4 housing types
(formal private, informal backyards, formal backyards, and
informal settlements) for each grid cell (24,014), from SAL
estimates
"""
sal_data = pd.read_excel(
path_folder
+ "housing_types_sal_analysis.xlsx",
header=6, nrows=5339)
sal_data["informal"] = sal_data[
"Informal dwelling (shack; not in backyard; e.g. in an"
+ " informal/squatter settlement or on a farm)"]
sal_data["backyard_formal"] = sal_data["House/flat/room in backyard"]
sal_data["backyard_informal"] = sal_data[
"Informal dwelling (shack; in backyard)"]
# NB: we do not include traditional houses, granny flats, caravans, and
# others
sal_data["formal"] = np.nansum(sal_data.iloc[:, [3, 5, 6, 7, 8]], 1)
sal_data = pd.read_excel(
path_folder
+ "housing_types_sal_analysis.xlsx",
header=6, nrows=5339)
sal_data["informal"] = sal_data[
"Informal dwelling (shack; not in backyard; e.g. in an"
+ " informal/squatter settlement or on a farm)"]
sal_data["backyard_formal"] = sal_data["House/flat/room in backyard"]
sal_data["backyard_informal"] = sal_data[
"Informal dwelling (shack; in backyard)"]
# NB: we do not include traditional houses, granny flats, caravans, and
# others
sal_data["formal"] = np.nansum(sal_data.iloc[:, [3, 5, 6, 7, 8]], 1)
# We import information on intersection between SAL and grid cells
grid_intersect = pd.read_csv(
path_data + 'grid_SAL_intersect.csv', sep=';')
grid_intersect.rename(columns={"Area_inter": "Area"}, inplace=True)
# We then proceed to the disaggregation of SAL data
print("Informal settlements")
informal_grid = gen_small_areas_to_grid(
grid, grid_intersect, sal_data["informal"],
sal_data["Small Area Code"], 'SAL')
print("Formal backyards")
backyard_formal_grid = gen_small_areas_to_grid(
grid, grid_intersect, sal_data["backyard_formal"],
sal_data["Small Area Code"], 'SAL')
print("Informal backyards")
backyard_informal_grid = gen_small_areas_to_grid(
grid, grid_intersect, sal_data["backyard_informal"],
sal_data["Small Area Code"], 'SAL')
# NB: this does include RDP
print("Private formal + subsidized formal housing")
formal_grid = gen_small_areas_to_grid(
grid, grid_intersect, sal_data["formal"], sal_data["Small Area Code"],
'SAL')
# We correct the number of dwellings per pixel by reweighting with the
# ratio of total original number over total estimated number
# NB: this allows to correct for potential errors or imperfect information
# in the matching of small areas and grid pixels
informal_grid = (informal_grid * (np.nansum(sal_data["informal"])
/ np.nansum(informal_grid)))
backyard_formal_grid = (backyard_formal_grid
* (np.nansum(sal_data["backyard_formal"])
/ np.nansum(backyard_formal_grid)))
backyard_informal_grid = (backyard_informal_grid
* (np.nansum(sal_data["backyard_informal"])
/ np.nansum(backyard_informal_grid)))
formal_grid = (formal_grid * (np.nansum(sal_data["formal"])
/ np.nansum(formal_grid)))
housing_types_grid_sal = pd.DataFrame()
housing_types_grid_sal["informal_grid"] = informal_grid
housing_types_grid_sal["backyard_formal_grid"] = backyard_formal_grid
housing_types_grid_sal["backyard_informal_grid"] = backyard_informal_grid
housing_types_grid_sal["formal_grid"] = formal_grid
# Replace missing values by zero
housing_types_grid_sal[np.isnan(housing_types_grid_sal)] = 0
housing_types_grid_sal.to_excel(
path_folder + 'housing_types_grid_sal.xlsx')
return housing_types_grid_sal
[docs]def gen_small_areas_to_grid(grid, grid_intersect, small_area_data,
small_area_code, unit):
"""
Convert SAL/SP data to grid dimensions.
Parameters
----------
grid : DataFrame
Table yielding, for each grid cell (24,014), its x and y
(centroid) coordinates, and its distance (in km) to the city centre
grid_intersect : DataFrame
Table yielding the intersection areas between grid cells (24,014)
and Small Areas (5,339) or Small Places (1,046)
small_area_data : Series
Number of dwellings (or other variable) in each chosen geographic unit
for a given housing type
small_area_code : Series
Numerical code associated with each chosen geographic unit
unit : str
Code defining with geographic unit should be used: must be set to
either "SP" or "SAL"
Returns
-------
grid_data : ndarray(float64)
Number of dwellings (or other variable) in each grid cell (24,014)
for a given housing type
"""
grid_data = np.zeros(len(grid.dist))
# We loop over grid pixels
print("Looping over pixels")
for index in range(0, len(grid.dist)):
print(index)
# We define a list of SAL/SP codes that do intersect considered pixel
intersect = np.unique(
grid_intersect[unit + '_CODE'][grid_intersect.ID_grille
== grid.id[index]]
)
if len(intersect) == 0:
grid_data[index] = np.nan
else:
# If the intersection is not empty, we loop over intersecting
# SAL/SPs
for i in range(0, len(intersect)):
small_code = intersect[i]
# Then, we get the corresponding area for the sub-intersection
# and for the overall SAL/SP
small_area_intersect = np.nansum(
grid_intersect.Area[
(grid_intersect.ID_grille == grid.id[index])
& (grid_intersect[unit + '_CODE'] == small_code)
].squeeze())
small_area = np.nansum(
grid_intersect.Area[
(grid_intersect[unit + '_CODE'] == small_code)]
)
# If such SAL/SP code does indeed exist in the matching data,
# we store a weighted average of the info it contains
if len(small_area_data[
small_area_code == small_code]) > 0:
# More precisely, this yields the number of
# dwellings/households given by the intersection
add = (small_area_data[small_area_code == small_code]
* (small_area_intersect / small_area))
else:
add = 0
# Finally, we update our output data with each weighted info
# corresponding to each of the SAL/SP intersection with
# considered grid pixel
grid_data[index] = grid_data[index] + add
return grid_data
[docs]def convert_income_distribution(income_distribution, grid, path_data, data_sp):
"""
Convert SP data for income distribution into grid dimensions.
This is used for validation as Small-Area-Level estimates are not
available for population distribution across income groups.
Parameters
----------
income_distribution : ndarray(uint16, ndim=2)
Exogenous number of households in each Small Place (1,046) for each
income group in the model (4)
grid : DataFrame
Table yielding, for each grid cell (24,014), its x and y
(centroid) coordinates, and its distance (in km) to the city centre
path_data : str
Path towards data used in the model
data_sp : DataFrame
Table yielding, for each Small Place (1,046), the average dwelling size
(in m²), the average land price and annual income level (in rands),
the size of unconstrained area for construction (in m²), the total area
(in km²), the distance to the city centre (in km), whether or not the
location belongs to Mitchells Plain, and the SP code
Returns
-------
income_grid : ndarray(float64, ndim=2)
Exogenous number of households in each grid cell (24,014) for each
income group in the model (4)
"""
grid_intersect = pd.read_csv(
path_data + 'grid_SP_intersect.csv', sep=';')
income0_grid = gen_small_areas_to_grid(
grid, grid_intersect, income_distribution[:, 0],
data_sp["sp_code"], 'SP')
income1_grid = gen_small_areas_to_grid(
grid, grid_intersect, income_distribution[:, 1],
data_sp["sp_code"], 'SP')
income2_grid = gen_small_areas_to_grid(
grid, grid_intersect, income_distribution[:, 2],
data_sp["sp_code"], 'SP')
income3_grid = gen_small_areas_to_grid(
grid, grid_intersect, income_distribution[:, 3],
data_sp["sp_code"], 'SP')
# We correct the values per pixel by reweighting with the
# ratio of total original number over total estimated number
income0_grid = (income0_grid * (np.nansum(income_distribution[:, 0])
/ np.nansum(income0_grid)))
income1_grid = (income1_grid * (np.nansum(income_distribution[:, 1])
/ np.nansum(income1_grid)))
income2_grid = (income2_grid * (np.nansum(income_distribution[:, 2])
/ np.nansum(income2_grid)))
income3_grid = (income3_grid * (np.nansum(income_distribution[:, 3])
/ np.nansum(income3_grid)))
income_grid = np.stack(
[income0_grid, income1_grid, income2_grid, income3_grid])
# Replace missing values by zero
income_grid[np.isnan(income_grid)] = 0
np.save(path_data + "income_distrib_grid.npy", income_grid)
return income_grid