Quarter-Internal/app/playgrounds/controller.py

427 lines
18 KiB
Python
Raw Normal View History

2025-03-24 01:45:24 +00:00
from csv import DictReader, DictWriter
from flask import current_app
from math import isclose, floor
import boto3
import requests
import base64
_FANNIE_ACCESS_TOKEN = None
class Range:
def __init__(self, min_val, max_val):
self.min = min_val
self.max = max_val
def __eq__(self, other):
return self.max == other.max and self.min == other.min
def __str__(self):
return f'{self.min}-{self.max}'
def __hash__(self):
return hash(self.__str__())
def __contains__(self, item):
return self.min <= item <= self.max
def within_range(self, other):
return self.min >= other.min and self.max <= other.max
def contains_range(self, other):
return self.max >= other.max and self.min >= other.min
def is_nested_or_contains(self, other):
return self.within_range(other) or self.contains_range(other)
class FICORange(Range):
def __init__(self, csv_fico_str):
if '+' in csv_fico_str:
min_fico = int(csv_fico_str.replace('+', ''))
max_fico = 850
else:
split_str = csv_fico_str.split('-', 1)
min_fico = int(split_str[0])
max_fico = int(split_str[1])
super(FICORange, self).__init__(min_fico, max_fico)
class LTVMRERange(Range):
def __init__(self, csv_ltv_mre_str):
if '<=' in csv_ltv_mre_str:
min_ltv = 0.0
max_ltv = float(csv_ltv_mre_str.replace('<=', ''))
else:
split_str = csv_ltv_mre_str.split('-', 1)
min_ltv = float(split_str[0])
max_ltv = float(split_str[1])
super(LTVMRERange, self).__init__(min_ltv, max_ltv)
class DTIRange(Range):
def __init__(self, csv_dti_str):
div_factor = 1
if '%' in csv_dti_str:
csv_dti_str = csv_dti_str.replace('%', '')
div_factor = 100
split_str = csv_dti_str.split('-', 1)
min_ltv = float(float(split_str[0])/div_factor)
max_ltv = float(float(split_str[1])/div_factor)
super(DTIRange, self).__init__(min_ltv, max_ltv)
class HomeGroupData:
def __init__(self, fico_range, ltv_range, dti_range, net_loss_rate, loss_severity, pool_percentage=None, avg_home_value=None):
self.fico_range = fico_range
self.ltv_range = ltv_range
self.dti_range = dti_range
self.net_loss_rate = net_loss_rate
self.loss_severity = loss_severity
self.pool_percent = pool_percentage
self.nominal_purchase_price = avg_home_value
self.nominal_home_value = avg_home_value
self.hpi_accrual_per_month = []
self.monthly_value = []
self.rp_assets = 0.0
self.home_count = 1
class RPAssets:
def __init__(self, hpi_value, asset_tkn_value, cash_value):
self.hpi_value = hpi_value
self.asset_tkn_value = asset_tkn_value
self.cash_value = cash_value
@property
def total_value(self):
return self.hpi_value + self.asset_tkn_value + self.cash_value
class RiskPool:
pass
def find_nested_range_in_list(desired_range, listed_ranges):
for possible_nested in listed_ranges:
if desired_range.is_nested_or_contains(possible_nested):
return possible_nested
return None
def load_csv_data(default_risks_file, example_homes_file, pool_makeup_file, hpi_accrual_rate_file):
# Load all the file data
def_risks_field_names = ['FICO Range', 'LTV/MRE', 'DTI', 'Net Loss Rate', 'Ever DQ180+', 'Severity']
with open(default_risks_file) as def_file:
default_risks_data = DictReader(def_file, def_risks_field_names)
next(default_risks_data, None)
# Start remapping the data to home group data objects
home_data = [
HomeGroupData(
FICORange(row['FICO Range']),
LTVMRERange(row['LTV/MRE']),
DTIRange(row['DTI']),
float(float(row['Net Loss Rate'].replace('%', ''))/100.00),
float(float(row['Severity'].replace('%', ''))/100.00)
) for row in default_risks_data]
makeup_field_names = ['FICO', 'LTV', 'DTI', 'Percentage of Pool']
with open(pool_makeup_file) as makeup_file:
pool_makeup_data = DictReader(makeup_file, makeup_field_names)
next(pool_makeup_data, None)
# create a map of the ranges to the data
fico_ltv_dti_range_to_pool_makeup = {}
for row in pool_makeup_data:
fico_range = FICORange(row['FICO'])
ltv_range = LTVMRERange(row['LTV'])
dti_range = DTIRange(row['DTI'])
value = row['Percentage of Pool']
if fico_range not in fico_ltv_dti_range_to_pool_makeup:
fico_ltv_dti_range_to_pool_makeup[fico_range] = {ltv_range: {dti_range: value}}
elif ltv_range not in fico_ltv_dti_range_to_pool_makeup[fico_range]:
fico_ltv_dti_range_to_pool_makeup[fico_range][ltv_range] = {dti_range: value}
else:
fico_ltv_dti_range_to_pool_makeup[fico_range][ltv_range][dti_range] = value
examples_field_names = ['FICO Range', 'LTV/MRE', 'DTI', 'Purchase Price']
with open(example_homes_file) as ex_file:
example_homes_data = DictReader(ex_file, examples_field_names)
next(example_homes_data, None)
# create a map of the ranges to the data
fico_ltv_dti_range_to_example_homes = {}
for row in example_homes_data:
fico_range = FICORange(row['FICO Range'])
ltv_range = LTVMRERange(row['LTV/MRE'])
dti_range = DTIRange(row['DTI'])
value = row['Purchase Price']
if fico_range not in fico_ltv_dti_range_to_example_homes:
fico_ltv_dti_range_to_example_homes[fico_range] = {ltv_range: {dti_range: value}}
elif ltv_range not in fico_ltv_dti_range_to_example_homes[fico_range]:
fico_ltv_dti_range_to_example_homes[fico_range][ltv_range] = {dti_range: value}
else:
fico_ltv_dti_range_to_example_homes[fico_range][ltv_range][dti_range] = value
# add everything to the home group data
for group in home_data:
# Try to find the Fico score range
fico_range = find_nested_range_in_list(group.fico_range, fico_ltv_dti_range_to_pool_makeup)
# if no appropriate fico range was found, then we need to set the pool makeup to 0
pool_makeup = 0
# if there was a good FICO range
if fico_range:
ltv_to_dti = fico_ltv_dti_range_to_pool_makeup[fico_range]
# look for an LTV range
ltv_range = find_nested_range_in_list(group.ltv_range, ltv_to_dti)
# if one is found
if ltv_range:
dti_to_res = ltv_to_dti[ltv_range]
# look for a DTI range
dti_range = find_nested_range_in_list(group.dti_range, dti_to_res)
# if we find everything
if dti_range:
# store the found value
pool_makeup = float(float(dti_to_res[dti_range].replace('%', ''))/100.0)
fico_range = find_nested_range_in_list(group.fico_range, fico_ltv_dti_range_to_example_homes)
# if no appropriate fico range was found, then we need to set the pool makeup to 0
avg_home_value = 0
# if there was a good FICO range
if fico_range:
ltv_to_dti = fico_ltv_dti_range_to_example_homes[fico_range]
# look for an LTV range
ltv_range = find_nested_range_in_list(group.ltv_range, ltv_to_dti)
# if one is found
if ltv_range:
dti_to_res = ltv_to_dti[ltv_range]
# look for a DTI range
dti_range = find_nested_range_in_list(group.dti_range, dti_to_res)
# if we find everything
if dti_range:
# store the found value
avg_home_value = float(dti_to_res[dti_range].replace('$', '').replace(',', '').strip())
group.pool_percent = pool_makeup
group.nominal_home_value = avg_home_value
group.nominal_purchase_price = avg_home_value
hpi_accrual = []
hpi_acc_field_names = ['Month', 'HPI']
with open(hpi_accrual_rate_file) as hpi_acc_file:
hpi_accrual_data = DictReader(hpi_acc_file, hpi_acc_field_names)
next(hpi_accrual_data, None)
hpi_accrual = [
float(float(row['HPI'].replace('%', ''))/100.0) for row in hpi_accrual_data]
return home_data, hpi_accrual
def set_home_count(home_data, total_home_count):
# theoretically the best possible accuracy is 1/total_home_count, as that is the amount we can increment by
best_poss_acc = 1/total_home_count
for group in home_data:
# floor so that we know we are under shooting the target
group.home_count = floor(total_home_count * group.pool_percent)
computed_percent = float(group.home_count/total_home_count)
while not isclose(computed_percent, group.pool_percent, abs_tol=best_poss_acc):
# truncating with multiplication should only lower the possible home count
group.home_count += 1
computed_percent = float(group.home_count/total_home_count)
close_enough = isclose(computed_percent, group.pool_percent, abs_tol=best_poss_acc)
print(close_enough)
# # If there are too many homes, correct by looking for groups that have too many, changing home values by 1
# # should be enough, since truncation should never cause a larger error than that
# while total_home_count - tollerance > total_homes > total_home_count + tollerance:
# for group in home_data:
# if group.pool_percent * total_home_count > group.home_count:
# group.home_count -= 1
# total_homes -= 1
# if total_homes <= total_home_count:
# break
def compute_group_monthly_payment(group_data, return_percent):
return compute_monthly_payment(group_data.nominal_home_price, return_percent)
def compute_monthly_payment(home_price, return_percent):
nominal_interest_rate = ((1.0 + (return_percent / 12.0)) ** 12.0) - 1.0
monthly_payment = home_price * nominal_interest_rate / 12.0
return monthly_payment
def compute_risk_pool_assets(home_data, hpi_accrual_data, total_homes=200, risk_pool_allocation=0.01, occupancy_fee_rate=0.04):
rp_assets_per_month = []
first_month_asset_tkn_value = 0.0
first_month_hpi_val = 0.0
first_month_cash_val = 0.0
first_month_liabilities = 0.0
set_home_count(home_data, total_homes)
home_total = 0
# get the first month data
for group in home_data:
first_month_asset_tkn_value += group.nominal_home_value * risk_pool_allocation * group.home_count
first_month_liabilities += group.nominal_home_value * group.loss_severity * group.net_loss_rate * group.home_count
home_total += group.home_count
first_month = {
'Time Period': 'Month 1',
'Asset Token Value': f'${first_month_asset_tkn_value:.2f}',
'HPI Token Value': '$0.00',
'Cash in Pool': '$0.00',
'Total Pool Assets': f'${first_month_asset_tkn_value:.2f}',
'Pool Liability': f'${first_month_liabilities:.2f}',
'Ratio': f'{float(first_month_asset_tkn_value/first_month_liabilities):0.4f}',
'Home Count': f'{home_total}'
}
rp_assets_per_month.append(first_month)
rp_hpi_val = first_month_hpi_val
asset_tkn_val = first_month_asset_tkn_value
cash_val = first_month_cash_val
# go through each month
for i in range(len(hpi_accrual_data)):
# Add the monthly take to the pool total
monthly_rp_assets = compute_monthly_risk_pool_assets(home_data, hpi_accrual_data[i], risk_pool_allocation, occupancy_fee_rate)
# get the totals for the month
rp_hpi_val += sum([a.hpi_value for a in monthly_rp_assets])
# The value of asset tokens does not change until the high watermark is implemented.
asset_tkn_val = sum([a.asset_tkn_value for a in monthly_rp_assets])
cash_val += sum([a.cash_value for a in monthly_rp_assets])
# increase the total value
pool_total_assets = cash_val + asset_tkn_val + rp_hpi_val
# get the liabilities
monthly_liabilities = compute_pool_liabilities(home_data)
total_liabilities = monthly_liabilities
# i+2 is used since the HPI tokens only accrue to the following month, and the first month is done manually
rp_assets_per_month.append({'Time Period': f'Month {i + 2}',
'Asset Token Value': f'${asset_tkn_val:.2f}',
'HPI Token Value': f'${rp_hpi_val:.2f}',
'Cash in Pool': f'${cash_val:.2f}',
'Total Pool Assets': f'{pool_total_assets:.2f}',
'Pool Liability': f'${monthly_liabilities:.2f}',
'Ratio': f'{float(pool_total_assets/total_liabilities):.4f}',
'Home Count': f'{home_total}'})
return rp_assets_per_month
def compute_monthly_risk_pool_assets(home_data, hpi_accrual, risk_pool_allocation=0.01, return_percent=0.04):
monthly_assets = []
for group in home_data:
# get the hpi take
risk_pool_hpi_take = group.nominal_home_value * hpi_accrual * risk_pool_allocation * group.home_count
# update the current monthly value
group.monthly_value.append(float(group.nominal_home_value * hpi_accrual))
# reset the nominal home value
group.nominal_home_value += (group.nominal_home_value * hpi_accrual)
rp_asset_tkn_val = group.nominal_purchase_price * risk_pool_allocation * group.home_count
# add the risk pool monthly income take
risk_pool_tic_fee_take = float(compute_monthly_payment(group, return_percent) * risk_pool_allocation) * group.home_count
# get the total assets
monthly_assets.append(RPAssets(risk_pool_hpi_take, rp_asset_tkn_val, risk_pool_tic_fee_take))
return monthly_assets
def compute_pool_liabilities(home_data):
rp_liabilities_tot = 0.0
for group in home_data:
risk_pool_net_loss = group.loss_severity * group.nominal_home_value * group.net_loss_rate
risk_pool_weighted_loss = risk_pool_net_loss * group.home_count
rp_liabilities_tot += risk_pool_weighted_loss
return rp_liabilities_tot
def _get_fannie_auth_token():
fannie_auth_url = 'https://auth.pingone.com/4c2b23f9-52b1-4f8f-aa1f-1d477590770c/as/token'
fannie_client_id_secret = f'{current_app.config["FANNIE_CLIENT_ID"]}:{current_app.config["FANNIE_CLIENT_SECRET"]}'
b64_encoded = base64.b64encode(bytes(fannie_client_id_secret, 'utf-8'))
fannie_client_id_secret = b64_encoded.decode('utf-8')
headers = {
'Authorization': f'Basic {fannie_client_id_secret}',
'Content-type': 'application/x-www-form-urlencoded'
}
data = 'grant_type=client_credentials'
response = requests.post(fannie_auth_url, headers=headers, data=data)
if response.status_code == 200:
json_data = response.json()
return json_data['access_token']
else:
return None
def _fetch_data_from_fannie(year):
if not current_app.config.get('FANNIE_AUTH_TOKEN'):
current_app.config['FANNIE_AUTH_TOKEN'] = _get_fannie_auth_token()
def request_fannie_data():
fannie_single_family_home_url = f'http://api.fanniemae.com/v1/sf-loan-performance-data/years/{year}/quarters/All'
fannie_headers = {
'x-public-access-token': f'`Bearer` {current_app.config.get("FANNIE_AUTH_TOKEN")}',
'accept': 'application/json'
}
return requests.get(fannie_single_family_home_url, headers=fannie_headers)
response = request_fannie_data()
if response.status_code == 401:
current_app.config['FANNIE_AUTH_TOKEN'] = _get_fannie_auth_token()
response = request_fannie_data()
if response.status_code == 200:
return response
def compute_mre(risk_pool_data, occupant_income, occupant_fico, home_price, default_rate_data, recovery_rate=0.9, loss_severity=0.19):
if occupant_fico < 620:
raise ValueError('FICO score too low.')
default_rate = 0.01
used_loss_severity = loss_severity
for group in default_rate_data:
if occupant_fico in group.fico_range:
used_loss_severity = group.loss_severity
default_rate = group.net_loss_rate/2 # divide by 2 since we are including the 4 month buffer
break
income_buffer_percent = (4 * compute_monthly_payment(home_price, 0.04))/home_price
loss_amount = home_price * used_loss_severity
default_risk = 1/default_rate
recovery_amt = 0.01 * home_price/recovery_rate
mre = (loss_amount - default_risk * recovery_amt)/home_price
if mre > 0.01:
return mre + income_buffer_percent
return 0.01 + income_buffer_percent
def load_default_rate_file(def_rate_file):
def_risks_field_names = ['FICO Range', 'LTV/MRE', 'DTI', 'Net Loss Rate', 'Ever DQ180+', 'Severity']
with open(def_rate_file) as def_file:
default_risks_data = DictReader(def_file, def_risks_field_names)
next(default_risks_data, None)
# Start remapping the data to home group data objects
home_data = [
HomeGroupData(
FICORange(row['FICO Range']),
LTVMRERange(row['LTV/MRE']),
DTIRange(row['DTI']),
float(float(row['Net Loss Rate'].replace('%', '')) / 100.00),
float(float(row['Severity'].replace('%', '')) / 100.00)
) for row in default_risks_data]
return home_data