Source code for _gettsim.policy_environment

import copy
import datetime
import operator
from functools import reduce
from typing import Callable

import numpy
import pandas as pd
import yaml

import _gettsim.functions  # Execute all decorators # noqa: F401
from _gettsim.config import INTERNAL_PARAMS_GROUPS, RESOURCE_DIR
from _gettsim.functions_loader import load_internal_functions
from _gettsim.piecewise_functions import (
    check_thresholds,
    get_piecewise_parameters,
    piecewise_polynomial,
)


[docs]def set_up_policy_environment(date): """Set up the policy environment for a particular date. Parameters ---------- date : int, str, datetime.date The date for which the policy system is set up. Returns ------- params : dict A dictionary with parameters from the policy environment. For more information see the documentation of the :ref:`params_files`. functions : dict Dictionary mapping column names to functions creating the respective data. """ # Check policy date for correct format and transfer to datetime.date date = _parse_date(date) params = {} for group in INTERNAL_PARAMS_GROUPS: params_one_group = _load_parameter_group_from_yaml(date, group) # Align parameters for piecewise polynomial functions params[group] = _parse_piecewise_parameters(params_one_group) # extend dictionary with date-specific values which do not need an own function params = _parse_kinderzuschl_max(date, params) params = _parse_einführungsfaktor_vorsorgeaufw_alter_ab_2005(date, params) params = _parse_vorsorgepauschale_rv_anteil(date, params) functions = load_functions_for_date(date) return params, functions
def _parse_date(date): """Check the policy date for different input formats. Parameters ---------- date : datetime.date, str, int The date for which the policy system is set up. Returns ------- date : datetime.date The date for which the policy system is set up. """ if isinstance(date, str): date = pd.to_datetime(date).date() elif isinstance(date, int): date = datetime.date(year=date, month=1, day=1) return date def _parse_piecewise_parameters(tax_data): """Check if parameters are stored in implicit structures and align to general structure. Parameters ---------- tax_data : dict Loaded raw tax data. Returns ------- tax_data : dict Parsed parameters ready to use in gettsim. """ for param in tax_data: if isinstance(tax_data[param], dict): if "type" in tax_data[param]: if tax_data[param]["type"].startswith("piecewise"): if "progressionsfaktor" in tax_data[param]: if tax_data[param]["progressionsfaktor"]: tax_data[param] = add_progressionsfaktor( tax_data[param], param ) tax_data[param] = get_piecewise_parameters( tax_data[param], param, func_type=tax_data[param]["type"].split("_")[1], ) for key in ["type", "progressionsfaktor"]: tax_data[param].pop(key, None) return tax_data def _parse_kinderzuschl_max(date, params): """Prior to 2021, the maximum amount of the Kinderzuschlag was specified directly in the laws and directives. In 2021, 2022, and from 2024 on, this measure has been derived from subsistence levels. This function implements that calculation. For 2023 the amount is once again explicitly specified as a parameter. Parameters ---------- date: datetime.date The date for which the policy parameters are set up. params: dict A dictionary with parameters from the policy environment. Returns ------- params: dic updated dictionary """ if (date.year >= 2024) or (2023 > date.year >= 2021): assert {"kinderzuschl", "kindergeld"} <= params.keys() params["kinderzuschl"]["maximum"] = ( params["kinderzuschl"]["existenzminimum"]["regelsatz"]["kinder"] + params["kinderzuschl"]["existenzminimum"]["kosten_der_unterkunft"][ "kinder" ] + params["kinderzuschl"]["existenzminimum"]["heizkosten"]["kinder"] ) / 12 - params["kindergeld"]["kindergeld"][1] return params def _parse_einführungsfaktor_vorsorgeaufw_alter_ab_2005(date, params): """Calculate introductory factor for pension expense deductions which depends on the current year as follows: In the years 2005-2025 the share of deductible contributions increases by 2 percentage points each year from 60% in 2005 to 100% in 2025. Reference: § 10 Abs. 1 Nr. 2 Buchst. a und b EStG Parameters ---------- date: datetime.date The date for which the policy parameters are set up. params: dict A dictionary with parameters from the policy environment. Returns ------- params: dic updated dictionary """ jahr = float(date.year) if jahr >= 2005: # ToDo: remove conversion to Series after moving to scalar out = piecewise_polynomial( pd.Series(jahr), thresholds=params["eink_st_abzuege"]["einführungsfaktor"]["thresholds"], rates=params["eink_st_abzuege"]["einführungsfaktor"]["rates"], intercepts_at_lower_thresholds=params["eink_st_abzuege"][ "einführungsfaktor" ]["intercepts_at_lower_thresholds"], ) params["eink_st_abzuege"][ "einführungsfaktor_vorsorgeaufw_alter_ab_2005" ] = out.loc[0] return params def _parse_vorsorgepauschale_rv_anteil(date, params): """Calculate the share of pension contributions to be deducted for Lohnsteuer increases by year. Parameters ---------- date: datetime.date The date for which the policy parameters are set up. params: dict A dictionary with parameters from the policy environment. Returns ------- out: float """ jahr = float(date.year) if jahr >= 2005: out = piecewise_polynomial( pd.Series(jahr), thresholds=params["eink_st_abzuege"]["vorsorgepauschale_rv_anteil"][ "thresholds" ], rates=params["eink_st_abzuege"]["vorsorgepauschale_rv_anteil"]["rates"], intercepts_at_lower_thresholds=params["eink_st_abzuege"][ "vorsorgepauschale_rv_anteil" ]["intercepts_at_lower_thresholds"], ) params["eink_st_abzuege"]["vorsorgepauschale_rv_anteil"] = out.loc[0] return params def load_functions_for_date(date): """Load time-dependent policy reforms. Parameters ---------- date : datetime.date The date for which the policy system is set up. Returns ------- functions : dict Dictionary mapping column names to functions creating the respective data. """ # Using TIME_DEPENDENT_FUNCTIONS here leads to failing tests. functions = { f.__info__["dates_active_dag_key"]: f for f in load_internal_functions().values() if is_time_dependent(f) and is_active_at_date(f, date) } return functions def is_time_dependent(f: Callable) -> bool: return hasattr(f, "__info__") and "dates_active_dag_key" in f.__info__ def is_active_at_date(f: Callable, date: datetime.date) -> bool: return f.__info__["dates_active_start"] <= date <= f.__info__["dates_active_end"] def _load_parameter_group_from_yaml( date, group, parameters=None, yaml_path=RESOURCE_DIR / "parameters" ): """Load data from raw yaml group file. Parameters ---------- date : datetime.date The date for which the policy system is set up. group : string Policy system compartment. parameters : list List of parameters to be loaded. Only relevant for in function calls. yaml_path : path Path to directory of yaml_file. (Used for testing of this function). Returns ------- out_params : dict Dictionary of parameters loaded from raw yaml file and striped of unnecessary keys. """ def subtract_years_from_date(dt, years): """Subtract one or more years from a date object.""" try: dt = dt.replace(year=dt.year - years) # Take care of leap years except ValueError: dt = dt.replace(year=dt.year - years, day=dt.day - 1) return dt raw_group_data = yaml.load( (yaml_path / f"{group}.yaml").read_text(encoding="utf-8"), Loader=yaml.CLoader, ) # Load parameters (exclude 'rounding' parameters which are handled at the # end of this function) not_trans_keys = ["note", "reference", "deviation_from", "access_different_date"] out_params = {} if not parameters: parameters = [k for k in raw_group_data if k != "rounding"] # Load values of all parameters at the specified date for param in parameters: policy_dates = sorted( key for key in raw_group_data[param] if isinstance(key, datetime.date) ) past_policies = [d for d in policy_dates if d <= date] if not past_policies: # If no policy exists, then we check if the policy maybe agrees right now # with another one. # Otherwise, do not create an entry for this parameter. if "deviation_from" in raw_group_data[param][numpy.min(policy_dates)]: future_policy = raw_group_data[param][numpy.min(policy_dates)] if "." in future_policy["deviation_from"]: path_list = future_policy["deviation_from"].split(".") params_temp = _load_parameter_group_from_yaml( date, path_list[0], parameters=[path_list[1]], yaml_path=yaml_path, ) if path_list[1] in params_temp: out_params[param] = params_temp[path_list[1]] else: policy_in_place = raw_group_data[param][numpy.max(past_policies)] if "scalar" in policy_in_place: if policy_in_place["scalar"] == "inf": out_params[param] = numpy.inf else: out_params[param] = policy_in_place["scalar"] else: out_params[param] = {} # Keys which if given are transferred add_trans_keys = ["type", "progressionsfaktor"] for key in add_trans_keys: if key in raw_group_data[param]: out_params[param][key] = raw_group_data[param][key] value_keys = ( key for key in policy_in_place if key not in not_trans_keys ) if "deviation_from" in policy_in_place: if policy_in_place["deviation_from"] == "previous": new_date = numpy.max(past_policies) - datetime.timedelta(days=1) out_params[param] = _load_parameter_group_from_yaml( new_date, group, parameters=[param], yaml_path=yaml_path )[param] elif "." in policy_in_place["deviation_from"]: path_list = policy_in_place["deviation_from"].split(".") out_params[param] = _load_parameter_group_from_yaml( date, path_list[0], parameters=[path_list[1]], yaml_path=yaml_path, )[path_list[1]] for key in value_keys: key_list = [] out_params[param][key] = transfer_dictionary( policy_in_place[key], copy.deepcopy(out_params[param][key]), key_list, ) else: for key in value_keys: out_params[param][key] = policy_in_place[key] # Also load earlier parameter values if this is specified in yaml if "access_different_date" in raw_group_data[param]: if raw_group_data[param]["access_different_date"] == "vorjahr": date_last_year = subtract_years_from_date(date, years=1) params_last_year = _load_parameter_group_from_yaml( date_last_year, group, parameters=[param], yaml_path=yaml_path ) if param in params_last_year: out_params[f"{param}_vorjahr"] = params_last_year[param] else: raise ValueError( "Currently, access_different_date is only implemented for " "'vorjahr' (last year). " f"For parameter {param} a different string is specified." ) out_params["datum"] = numpy.datetime64(date) # Load rounding parameters if they exist if "rounding" in raw_group_data: out_params["rounding"] = _load_rounding_parameters( date, raw_group_data["rounding"] ) return out_params def _load_rounding_parameters(date, rounding_spec): """Load rounding parameters for a specific date from a dictionary. Parameters ---------- date : datetime.date The date for which the policy system is set up. rounding_spec : dictionary - Keys: Functions to be rounded. - Values: Rounding parameters for all dates Returns: dictionary: - Keys: Functions to be rounded. - Values: Rounding parameters for the specified date """ out = {} rounding_parameters = ["direction", "base"] # Load values of all parameters at the specified date. for function_name, rounding_spec_func in rounding_spec.items(): # Find all specified policy dates before date. policy_dates_before_date = sorted( key for key in rounding_spec_func if isinstance(key, datetime.date) and key <= date ) # If any rounding specs are defined for a date before the specified # date, copy them to params dictionary. # If no appropriate rounding specs are found for the requested date, # the function will not appear in the returned dictionary. # Note this will raise an error later unless the user adds an # appropriate rounding specification to the parameters dictionary. if policy_dates_before_date: policy_date_in_place = numpy.max(policy_dates_before_date) policy_in_place = rounding_spec_func[policy_date_in_place] out[function_name] = {} for key in [k for k in policy_in_place if k in rounding_parameters]: out[function_name][key] = policy_in_place[key] return out def transfer_dictionary(remaining_dict, new_dict, key_list): # To call recursive, always check if object is a dict if isinstance(remaining_dict, dict): for key in remaining_dict: key_list_updated = [*key_list, key] new_dict = transfer_dictionary( remaining_dict[key], new_dict, key_list_updated ) elif len(key_list) == 0: return remaining_dict else: # Now remaining dict is just a scalar set_by_path(new_dict, key_list, remaining_dict) return new_dict def get_by_path(data_dict, key_list): """Access a nested object in root by item sequence.""" return reduce(operator.getitem, key_list, data_dict) def set_by_path(data_dict, key_list, value): """Set a value in a nested object in root by item sequence.""" get_by_path(data_dict, key_list[:-1])[key_list[-1]] = value def add_progressionsfaktor(params_dict, parameter): """Quadratic factor of tax tariff function. The German tax tariff is defined on several income intervals with distinct marginal tax rates at the thresholds. To ensure an almost linear increase of the average tax rate, the German tax tariff is defined as a quadratic function, where the quadratic rate is the so called linear Progressionsfaktor. For its calculation one needs the lower (low_thres) and upper (upper_thres) thresholds of the interval as well as the marginal tax rate of the interval (rate_iv) and of the following interval (rate_fiv). The formula is then given by: (rate_fiv - rate_iv) / (2 * (upper_thres - low_thres)) """ out_dict = copy.deepcopy(params_dict) interval_keys = sorted(key for key in out_dict if isinstance(key, int)) # Check and extract lower thresholds. lower_thresholds, upper_thresholds, thresholds = check_thresholds( params_dict, parameter, interval_keys ) for key in interval_keys: if "rate_quadratic" not in out_dict[key]: out_dict[key]["rate_quadratic"] = ( out_dict[key + 1]["rate_linear"] - out_dict[key]["rate_linear"] ) / (2 * (upper_thresholds[key] - lower_thresholds[key])) return out_dict