diff --git a/app/playgrounds/__init__.py b/app/playgrounds/__init__.py new file mode 100644 index 0000000..b791a2a --- /dev/null +++ b/app/playgrounds/__init__.py @@ -0,0 +1,5 @@ +from flask import Blueprint + +bp = Blueprint('playgrounds', __name__) + +from app.playgrounds import routes \ No newline at end of file diff --git a/app/playgrounds/controller.py b/app/playgrounds/controller.py new file mode 100644 index 0000000..81b439e --- /dev/null +++ b/app/playgrounds/controller.py @@ -0,0 +1,426 @@ +from csv import DictReader, DictWriter +from flask import current_app +from math import isclose, floor +import boto3 +import requests +import base64 + +_FANNIE_ACCESS_TOKEN = None + + +class Range: + def __init__(self, min_val, max_val): + self.min = min_val + self.max = max_val + + def __eq__(self, other): + return self.max == other.max and self.min == other.min + + def __str__(self): + return f'{self.min}-{self.max}' + + def __hash__(self): + return hash(self.__str__()) + + def __contains__(self, item): + return self.min <= item <= self.max + + def within_range(self, other): + return self.min >= other.min and self.max <= other.max + + def contains_range(self, other): + return self.max >= other.max and self.min >= other.min + + def is_nested_or_contains(self, other): + return self.within_range(other) or self.contains_range(other) + + +class FICORange(Range): + def __init__(self, csv_fico_str): + if '+' in csv_fico_str: + min_fico = int(csv_fico_str.replace('+', '')) + max_fico = 850 + else: + split_str = csv_fico_str.split('-', 1) + min_fico = int(split_str[0]) + max_fico = int(split_str[1]) + super(FICORange, self).__init__(min_fico, max_fico) + + +class LTVMRERange(Range): + def __init__(self, csv_ltv_mre_str): + if '<=' in csv_ltv_mre_str: + min_ltv = 0.0 + max_ltv = float(csv_ltv_mre_str.replace('<=', '')) + else: + split_str = csv_ltv_mre_str.split('-', 1) + min_ltv = float(split_str[0]) + max_ltv = float(split_str[1]) + super(LTVMRERange, self).__init__(min_ltv, max_ltv) + + +class DTIRange(Range): + def __init__(self, csv_dti_str): + div_factor = 1 + if '%' in csv_dti_str: + csv_dti_str = csv_dti_str.replace('%', '') + div_factor = 100 + + split_str = csv_dti_str.split('-', 1) + min_ltv = float(float(split_str[0])/div_factor) + max_ltv = float(float(split_str[1])/div_factor) + super(DTIRange, self).__init__(min_ltv, max_ltv) + + +class HomeGroupData: + + def __init__(self, fico_range, ltv_range, dti_range, net_loss_rate, loss_severity, pool_percentage=None, avg_home_value=None): + self.fico_range = fico_range + self.ltv_range = ltv_range + self.dti_range = dti_range + self.net_loss_rate = net_loss_rate + self.loss_severity = loss_severity + self.pool_percent = pool_percentage + self.nominal_purchase_price = avg_home_value + self.nominal_home_value = avg_home_value + self.hpi_accrual_per_month = [] + self.monthly_value = [] + self.rp_assets = 0.0 + self.home_count = 1 + + +class RPAssets: + def __init__(self, hpi_value, asset_tkn_value, cash_value): + self.hpi_value = hpi_value + self.asset_tkn_value = asset_tkn_value + self.cash_value = cash_value + + @property + def total_value(self): + return self.hpi_value + self.asset_tkn_value + self.cash_value + + +class RiskPool: + pass + + +def find_nested_range_in_list(desired_range, listed_ranges): + for possible_nested in listed_ranges: + if desired_range.is_nested_or_contains(possible_nested): + return possible_nested + return None + + +def load_csv_data(default_risks_file, example_homes_file, pool_makeup_file, hpi_accrual_rate_file): + + # Load all the file data + + def_risks_field_names = ['FICO Range', 'LTV/MRE', 'DTI', 'Net Loss Rate', 'Ever DQ180+', 'Severity'] + with open(default_risks_file) as def_file: + default_risks_data = DictReader(def_file, def_risks_field_names) + next(default_risks_data, None) + # Start remapping the data to home group data objects + home_data = [ + HomeGroupData( + FICORange(row['FICO Range']), + LTVMRERange(row['LTV/MRE']), + DTIRange(row['DTI']), + float(float(row['Net Loss Rate'].replace('%', ''))/100.00), + float(float(row['Severity'].replace('%', ''))/100.00) + ) for row in default_risks_data] + + makeup_field_names = ['FICO', 'LTV', 'DTI', 'Percentage of Pool'] + with open(pool_makeup_file) as makeup_file: + pool_makeup_data = DictReader(makeup_file, makeup_field_names) + next(pool_makeup_data, None) + # create a map of the ranges to the data + fico_ltv_dti_range_to_pool_makeup = {} + for row in pool_makeup_data: + fico_range = FICORange(row['FICO']) + ltv_range = LTVMRERange(row['LTV']) + dti_range = DTIRange(row['DTI']) + value = row['Percentage of Pool'] + if fico_range not in fico_ltv_dti_range_to_pool_makeup: + fico_ltv_dti_range_to_pool_makeup[fico_range] = {ltv_range: {dti_range: value}} + elif ltv_range not in fico_ltv_dti_range_to_pool_makeup[fico_range]: + fico_ltv_dti_range_to_pool_makeup[fico_range][ltv_range] = {dti_range: value} + else: + fico_ltv_dti_range_to_pool_makeup[fico_range][ltv_range][dti_range] = value + + examples_field_names = ['FICO Range', 'LTV/MRE', 'DTI', 'Purchase Price'] + with open(example_homes_file) as ex_file: + example_homes_data = DictReader(ex_file, examples_field_names) + next(example_homes_data, None) + # create a map of the ranges to the data + fico_ltv_dti_range_to_example_homes = {} + for row in example_homes_data: + fico_range = FICORange(row['FICO Range']) + ltv_range = LTVMRERange(row['LTV/MRE']) + dti_range = DTIRange(row['DTI']) + value = row['Purchase Price'] + if fico_range not in fico_ltv_dti_range_to_example_homes: + fico_ltv_dti_range_to_example_homes[fico_range] = {ltv_range: {dti_range: value}} + elif ltv_range not in fico_ltv_dti_range_to_example_homes[fico_range]: + fico_ltv_dti_range_to_example_homes[fico_range][ltv_range] = {dti_range: value} + else: + fico_ltv_dti_range_to_example_homes[fico_range][ltv_range][dti_range] = value + + # add everything to the home group data + for group in home_data: + # Try to find the Fico score range + fico_range = find_nested_range_in_list(group.fico_range, fico_ltv_dti_range_to_pool_makeup) + # if no appropriate fico range was found, then we need to set the pool makeup to 0 + pool_makeup = 0 + # if there was a good FICO range + if fico_range: + ltv_to_dti = fico_ltv_dti_range_to_pool_makeup[fico_range] + # look for an LTV range + ltv_range = find_nested_range_in_list(group.ltv_range, ltv_to_dti) + # if one is found + if ltv_range: + dti_to_res = ltv_to_dti[ltv_range] + # look for a DTI range + dti_range = find_nested_range_in_list(group.dti_range, dti_to_res) + # if we find everything + if dti_range: + # store the found value + pool_makeup = float(float(dti_to_res[dti_range].replace('%', ''))/100.0) + + fico_range = find_nested_range_in_list(group.fico_range, fico_ltv_dti_range_to_example_homes) + # if no appropriate fico range was found, then we need to set the pool makeup to 0 + avg_home_value = 0 + # if there was a good FICO range + if fico_range: + ltv_to_dti = fico_ltv_dti_range_to_example_homes[fico_range] + # look for an LTV range + ltv_range = find_nested_range_in_list(group.ltv_range, ltv_to_dti) + # if one is found + if ltv_range: + dti_to_res = ltv_to_dti[ltv_range] + # look for a DTI range + dti_range = find_nested_range_in_list(group.dti_range, dti_to_res) + # if we find everything + if dti_range: + # store the found value + avg_home_value = float(dti_to_res[dti_range].replace('$', '').replace(',', '').strip()) + + group.pool_percent = pool_makeup + group.nominal_home_value = avg_home_value + group.nominal_purchase_price = avg_home_value + + hpi_accrual = [] + + hpi_acc_field_names = ['Month', 'HPI'] + with open(hpi_accrual_rate_file) as hpi_acc_file: + hpi_accrual_data = DictReader(hpi_acc_file, hpi_acc_field_names) + next(hpi_accrual_data, None) + hpi_accrual = [ + float(float(row['HPI'].replace('%', ''))/100.0) for row in hpi_accrual_data] + + return home_data, hpi_accrual + + +def set_home_count(home_data, total_home_count): + # theoretically the best possible accuracy is 1/total_home_count, as that is the amount we can increment by + best_poss_acc = 1/total_home_count + for group in home_data: + # floor so that we know we are under shooting the target + group.home_count = floor(total_home_count * group.pool_percent) + computed_percent = float(group.home_count/total_home_count) + while not isclose(computed_percent, group.pool_percent, abs_tol=best_poss_acc): + # truncating with multiplication should only lower the possible home count + group.home_count += 1 + computed_percent = float(group.home_count/total_home_count) + close_enough = isclose(computed_percent, group.pool_percent, abs_tol=best_poss_acc) + print(close_enough) + + # # If there are too many homes, correct by looking for groups that have too many, changing home values by 1 + # # should be enough, since truncation should never cause a larger error than that + # while total_home_count - tollerance > total_homes > total_home_count + tollerance: + # for group in home_data: + # if group.pool_percent * total_home_count > group.home_count: + # group.home_count -= 1 + # total_homes -= 1 + # if total_homes <= total_home_count: + # break + + +def compute_group_monthly_payment(group_data, return_percent): + return compute_monthly_payment(group_data.nominal_home_price, return_percent) + + +def compute_monthly_payment(home_price, return_percent): + nominal_interest_rate = ((1.0 + (return_percent / 12.0)) ** 12.0) - 1.0 + monthly_payment = home_price * nominal_interest_rate / 12.0 + return monthly_payment + + +def compute_risk_pool_assets(home_data, hpi_accrual_data, total_homes=200, risk_pool_allocation=0.01, occupancy_fee_rate=0.04): + rp_assets_per_month = [] + first_month_asset_tkn_value = 0.0 + first_month_hpi_val = 0.0 + first_month_cash_val = 0.0 + first_month_liabilities = 0.0 + set_home_count(home_data, total_homes) + home_total = 0 + # get the first month data + for group in home_data: + first_month_asset_tkn_value += group.nominal_home_value * risk_pool_allocation * group.home_count + first_month_liabilities += group.nominal_home_value * group.loss_severity * group.net_loss_rate * group.home_count + home_total += group.home_count + + first_month = { + 'Time Period': 'Month 1', + 'Asset Token Value': f'${first_month_asset_tkn_value:.2f}', + 'HPI Token Value': '$0.00', + 'Cash in Pool': '$0.00', + 'Total Pool Assets': f'${first_month_asset_tkn_value:.2f}', + 'Pool Liability': f'${first_month_liabilities:.2f}', + 'Ratio': f'{float(first_month_asset_tkn_value/first_month_liabilities):0.4f}', + 'Home Count': f'{home_total}' + } + + rp_assets_per_month.append(first_month) + rp_hpi_val = first_month_hpi_val + asset_tkn_val = first_month_asset_tkn_value + cash_val = first_month_cash_val + # go through each month + for i in range(len(hpi_accrual_data)): + # Add the monthly take to the pool total + monthly_rp_assets = compute_monthly_risk_pool_assets(home_data, hpi_accrual_data[i], risk_pool_allocation, occupancy_fee_rate) + # get the totals for the month + rp_hpi_val += sum([a.hpi_value for a in monthly_rp_assets]) + # The value of asset tokens does not change until the high watermark is implemented. + asset_tkn_val = sum([a.asset_tkn_value for a in monthly_rp_assets]) + cash_val += sum([a.cash_value for a in monthly_rp_assets]) + # increase the total value + pool_total_assets = cash_val + asset_tkn_val + rp_hpi_val + + # get the liabilities + monthly_liabilities = compute_pool_liabilities(home_data) + total_liabilities = monthly_liabilities + # i+2 is used since the HPI tokens only accrue to the following month, and the first month is done manually + rp_assets_per_month.append({'Time Period': f'Month {i + 2}', + 'Asset Token Value': f'${asset_tkn_val:.2f}', + 'HPI Token Value': f'${rp_hpi_val:.2f}', + 'Cash in Pool': f'${cash_val:.2f}', + 'Total Pool Assets': f'{pool_total_assets:.2f}', + 'Pool Liability': f'${monthly_liabilities:.2f}', + 'Ratio': f'{float(pool_total_assets/total_liabilities):.4f}', + 'Home Count': f'{home_total}'}) + + return rp_assets_per_month + + +def compute_monthly_risk_pool_assets(home_data, hpi_accrual, risk_pool_allocation=0.01, return_percent=0.04): + monthly_assets = [] + for group in home_data: + # get the hpi take + risk_pool_hpi_take = group.nominal_home_value * hpi_accrual * risk_pool_allocation * group.home_count + # update the current monthly value + group.monthly_value.append(float(group.nominal_home_value * hpi_accrual)) + # reset the nominal home value + group.nominal_home_value += (group.nominal_home_value * hpi_accrual) + rp_asset_tkn_val = group.nominal_purchase_price * risk_pool_allocation * group.home_count + # add the risk pool monthly income take + risk_pool_tic_fee_take = float(compute_monthly_payment(group, return_percent) * risk_pool_allocation) * group.home_count + # get the total assets + monthly_assets.append(RPAssets(risk_pool_hpi_take, rp_asset_tkn_val, risk_pool_tic_fee_take)) + return monthly_assets + + +def compute_pool_liabilities(home_data): + rp_liabilities_tot = 0.0 + for group in home_data: + risk_pool_net_loss = group.loss_severity * group.nominal_home_value * group.net_loss_rate + risk_pool_weighted_loss = risk_pool_net_loss * group.home_count + rp_liabilities_tot += risk_pool_weighted_loss + + return rp_liabilities_tot + + +def _get_fannie_auth_token(): + fannie_auth_url = 'https://auth.pingone.com/4c2b23f9-52b1-4f8f-aa1f-1d477590770c/as/token' + fannie_client_id_secret = f'{current_app.config["FANNIE_CLIENT_ID"]}:{current_app.config["FANNIE_CLIENT_SECRET"]}' + b64_encoded = base64.b64encode(bytes(fannie_client_id_secret, 'utf-8')) + fannie_client_id_secret = b64_encoded.decode('utf-8') + headers = { + 'Authorization': f'Basic {fannie_client_id_secret}', + 'Content-type': 'application/x-www-form-urlencoded' + } + data = 'grant_type=client_credentials' + response = requests.post(fannie_auth_url, headers=headers, data=data) + if response.status_code == 200: + json_data = response.json() + return json_data['access_token'] + else: + return None + + +def _fetch_data_from_fannie(year): + if not current_app.config.get('FANNIE_AUTH_TOKEN'): + current_app.config['FANNIE_AUTH_TOKEN'] = _get_fannie_auth_token() + + def request_fannie_data(): + fannie_single_family_home_url = f'http://api.fanniemae.com/v1/sf-loan-performance-data/years/{year}/quarters/All' + fannie_headers = { + 'x-public-access-token': f'`Bearer` {current_app.config.get("FANNIE_AUTH_TOKEN")}', + 'accept': 'application/json' + } + return requests.get(fannie_single_family_home_url, headers=fannie_headers) + + response = request_fannie_data() + + if response.status_code == 401: + current_app.config['FANNIE_AUTH_TOKEN'] = _get_fannie_auth_token() + response = request_fannie_data() + + if response.status_code == 200: + return response + + +def compute_mre(risk_pool_data, occupant_income, occupant_fico, home_price, default_rate_data, recovery_rate=0.9, loss_severity=0.19): + if occupant_fico < 620: + raise ValueError('FICO score too low.') + + default_rate = 0.01 + used_loss_severity = loss_severity + for group in default_rate_data: + if occupant_fico in group.fico_range: + used_loss_severity = group.loss_severity + default_rate = group.net_loss_rate/2 # divide by 2 since we are including the 4 month buffer + break + + income_buffer_percent = (4 * compute_monthly_payment(home_price, 0.04))/home_price + loss_amount = home_price * used_loss_severity + default_risk = 1/default_rate + recovery_amt = 0.01 * home_price/recovery_rate + mre = (loss_amount - default_risk * recovery_amt)/home_price + if mre > 0.01: + return mre + income_buffer_percent + return 0.01 + income_buffer_percent + + +def load_default_rate_file(def_rate_file): + def_risks_field_names = ['FICO Range', 'LTV/MRE', 'DTI', 'Net Loss Rate', 'Ever DQ180+', 'Severity'] + with open(def_rate_file) as def_file: + default_risks_data = DictReader(def_file, def_risks_field_names) + next(default_risks_data, None) + # Start remapping the data to home group data objects + home_data = [ + HomeGroupData( + FICORange(row['FICO Range']), + LTVMRERange(row['LTV/MRE']), + DTIRange(row['DTI']), + float(float(row['Net Loss Rate'].replace('%', '')) / 100.00), + float(float(row['Severity'].replace('%', '')) / 100.00) + ) for row in default_risks_data] + + return home_data + + + + + + + diff --git a/app/playgrounds/exel EQ b/app/playgrounds/exel EQ new file mode 100644 index 0000000..4eded3b --- /dev/null +++ b/app/playgrounds/exel EQ @@ -0,0 +1 @@ +(purchase_price*loss_severity) - (1/(def_rate) * (purchase_price*0.01))/recovery_rate/home_price+income_buffer_percent,0.01+C7 \ No newline at end of file diff --git a/app/playgrounds/forms.py b/app/playgrounds/forms.py new file mode 100644 index 0000000..5398c70 --- /dev/null +++ b/app/playgrounds/forms.py @@ -0,0 +1,27 @@ +from flask_wtf import FlaskForm as Form +from flask_wtf.file import FileField +from wtforms.fields import IntegerField, FloatField +from wtforms import validators +from app_common.fields import PercentageField, DollarAmountField + + +class HpiValuePredictForm(Form): + default_risks = FileField('Default Risk CSV') + example_homes = FileField('Example Homes CSV') + pool_makeup = FileField('Pool Makeup CSV') + hpi_accrual_rate = FileField('HPI Accrual Rate CSV') + risk_pool_take_percent = PercentageField('Risk Pool Allocation', default=0.01, places=2, + validators=[validators.Optional(), validators.NumberRange(min=0.0025, max=0.10)]) + tic_fee = PercentageField('TIC Fee', default=0.04, places=2, + validators=[validators.Optional(), validators.NumberRange(min=0.01, max=0.10)]) + number_of_homes = IntegerField('Total Number of Homes', default=200, validators=[validators.NumberRange(min=100)]) + + +class MREForm(Form): + occupant_fico = IntegerField('Occupant PICO Score', default=640, validators=[validators.Required(), validators.NumberRange(min=620, max=850)]) + occupant_income = DollarAmountField('Occupant Income', default=50000, validators=[validators.Optional()]) + home_price = DollarAmountField('Home Purchase Price', default=250000.00, validators=[validators.Required()]) + default_rates = FileField('Default Rate File') + risk_pool_makeup = FileField('Risk Pool Makeup') + + diff --git a/app/playgrounds/routes.py b/app/playgrounds/routes.py new file mode 100644 index 0000000..590c2fd --- /dev/null +++ b/app/playgrounds/routes.py @@ -0,0 +1,85 @@ +import csv +import os +from flask import render_template, request, url_for, session, redirect, current_app, send_file +from flask_login import current_user +from app.playgrounds import bp +from app.playgrounds import forms +from werkzeug.utils import secure_filename +from app.playgrounds.controller import load_csv_data, compute_risk_pool_assets, _fetch_data_from_fannie, load_default_rate_file, compute_mre +from datetime import datetime +# import app.auth.google + + +@bp.route('/risk_pool_generator', methods=['GET', 'POST']) +def generate_risk_pool(): + # _fetch_data_from_fannie(2008) + form = forms.HpiValuePredictForm(request.form) + if form.validate_on_submit(): + default_risks_file = form.default_risks.name + saved_def_risks_file_path = os.path.join(current_app.config['UPLOAD_PATH'], + f'{secure_filename(default_risks_file)}.csv') + + example_homes_file = form.example_homes.name + saved_ex_homes_file_path = os.path.join(current_app.config['UPLOAD_PATH'], + f'{secure_filename(example_homes_file)}.csv') + + pool_makeup_file = form.pool_makeup.name + saved_makeup_file_path = os.path.join(current_app.config['UPLOAD_PATH'], + f'{secure_filename(pool_makeup_file)}.csv') + + hpi_accrual_file = form.hpi_accrual_rate.name + saved_hpi_acc_file_path = os.path.join(current_app.config['UPLOAD_PATH'], + f'{secure_filename(hpi_accrual_file)}.csv') + + for f in [hpi_accrual_file, pool_makeup_file, example_homes_file, default_risks_file]: + + if not os.path.exists(current_app.config['UPLOAD_PATH']): + os.makedirs(current_app.config['UPLOAD_PATH']) + + csv_data = request.files[f].read().decode('utf-8').replace('\r', '') + f = os.path.join(current_app.config['UPLOAD_PATH'], f'{secure_filename(f)}.csv') + open(f, 'w+').write(csv_data) + + home_data, hpi_accrual_data = load_csv_data(saved_def_risks_file_path, saved_ex_homes_file_path, saved_makeup_file_path, saved_hpi_acc_file_path) + + risk_pool_take = form.risk_pool_take_percent.data + occupancy_fee_rate = form.tic_fee.data + + rp_asset_liability_data = compute_risk_pool_assets(home_data, hpi_accrual_data, form.number_of_homes.data, risk_pool_take, occupancy_fee_rate) + + time_str = datetime.now().strftime('%Y%m%d_%H%M%S') + out_file_path = os.path.join(current_app.config['UPLOAD_PATH'], f'Risk_Pool_Data_{time_str}.csv') + + with open(out_file_path, 'w', newline='') as csv_out: + field_names = ['Time Period', 'Asset Token Value', 'HPI Token Value', 'Cash in Pool', 'Total Pool Assets', 'Pool Liability', 'Ratio', 'Home Count'] + csv_writer = csv.DictWriter(csv_out, fieldnames=field_names) + csv_writer.writeheader() + for row in rp_asset_liability_data: + csv_writer.writerow(row) + + return send_file(os.path.join('..', out_file_path), as_attachment=True) + + return render_template('risk_pool_generator.html', form=form) + + +@bp.route('/mre', methods=['GET', 'POST']) +def mre_playground(): + form = forms.MREForm(request.form) + + if form.validate_on_submit(): + # save the uploaded default rate file + default_risks_file = form.default_rates.name + saved_def_risks_file_path = os.path.join(current_app.config['UPLOAD_PATH'], + f'{secure_filename(default_risks_file)}.csv') + + csv_data = request.files[default_risks_file].read().decode('utf-8').replace('\r', '') + f = os.path.join(current_app.config['UPLOAD_PATH'], f'{secure_filename(default_risks_file)}.csv') + open(f, 'w+').write(csv_data) + + default_data = load_default_rate_file(saved_def_risks_file_path) + + mre = compute_mre({}, form.occupant_income.data, form.occupant_fico.data, form.home_price.data, default_data) + return render_template('mre_generator.html', form=form, mre=f'{mre * 100:.2f}%') + + return render_template('mre_generator.html', form=form, mre=None) +