Source code for _gettsim.piecewise_functions

import numpy


[docs]def piecewise_polynomial( x, thresholds, rates, intercepts_at_lower_thresholds, rates_multiplier=None ): """Calculate value of the piecewise function at `x`. Parameters ---------- x : pd.Series Series with values which piecewise polynomial is applied to. thresholds : numpy.array A one-dimensional array containing the thresholds for all intervals. rates : numpy.ndarray A two-dimensional array where columns are interval sections and rows correspond to the nth polynomial. intercepts_at_lower_thresholds : numpy.ndarray The intercepts at the lower threshold of each interval. rates_multiplier : pd.Series, float Multiplier to create individual or scaled rates. If given and not equal to 1, the function also calculates new intercepts. Returns ------- out : float The value of `x` under the piecewise function. """ num_intervals = len(thresholds) - 1 degree_polynomial = rates.shape[0] # Check in which interval each individual is. The thresholds are not exclusive on # the right side. selected_bin = numpy.searchsorted(thresholds, x, side="right") - 1 # Calc last threshold for each individual threshold = thresholds[selected_bin] # Increment for each individual in the corresponding interval. increment_to_calc = x - threshold # If each individual has its own rates or the rates are scaled, we can't use the # intercept, which was generated in the parameter loading. if rates_multiplier is not None: # Initialize Series containing 0 for all individuals. out = intercepts_at_lower_thresholds[0] # Go through all intervals except the first and last. for i in range(2, num_intervals): threshold_incr = thresholds[i] - thresholds[i - 1] for pol in range(1, degree_polynomial + 1): # We only calculate the intercepts for individuals who are in this or # higher interval. Hence we have to use the individual rates. if selected_bin >= i: out += ( rates_multiplier * rates[pol - 1, i - 1] * threshold_incr**pol ) # If rates remain the same, everything is a lot easier. else: # We assign each individual the pre-calculated intercept. out = intercepts_at_lower_thresholds[selected_bin] # Intialize a multiplyer for 1 if it is not given. rates_multiplier = 1 if rates_multiplier is None else rates_multiplier if selected_bin > 0: # Now add the evaluation of the increment for pol in range(1, degree_polynomial + 1): out += ( rates[pol - 1][selected_bin] * rates_multiplier * (increment_to_calc**pol) ) return out
def get_piecewise_parameters(parameter_dict, parameter, func_type): """Create the objects for piecewise polynomial. Parameters ---------- parameter_dict parameter func_type Returns ------- """ # Get all interval keys. keys = sorted(key for key in parameter_dict if isinstance(key, int)) # Check if keys are consecutive numbers and starting at 0. if keys != list(range(len(keys))): raise ValueError( f"The keys of {parameter} do not start with 0 or are not consecutive" f" numbers." ) # Extract lower thresholds. lower_thresholds, upper_thresholds, thresholds = check_thresholds( parameter_dict, parameter, keys ) # Create and fill rates-array rates = check_rates(parameter_dict, parameter, keys, func_type) # Create and fill interecept-array intercepts = check_intercepts( parameter_dict, parameter, lower_thresholds, upper_thresholds, rates, keys ) piecewise_elements = { "thresholds": numpy.array(thresholds), "rates": rates, "intercepts_at_lower_thresholds": intercepts, } return piecewise_elements def check_thresholds(parameter_dict, parameter, keys): """Check and transfer raw threshold data. Transfer and check raw threshold data, which needs to be specified in a piecewise_polynomial layout in the yaml file. Parameters ---------- parameter_dict parameter keys Returns ------- """ lower_thresholds = numpy.zeros(len(keys)) upper_thresholds = numpy.zeros(len(keys)) # Check if lowest threshold exists. if "lower_threshold" not in parameter_dict[0]: raise ValueError( f"The first piece of {parameter} needs to contain a lower_threshold value." ) lower_thresholds[0] = parameter_dict[0]["lower_threshold"] # Check if highest upper_threshold exists. if "upper_threshold" not in parameter_dict[keys[-1]]: raise ValueError( f"The last piece of {parameter} needs to contain an upper_threshold value." ) upper_thresholds[keys[-1]] = parameter_dict[keys[-1]]["upper_threshold"] # Check if the function is defined on the complete real line if (upper_thresholds[keys[-1]] != numpy.inf) | (lower_thresholds[0] != -numpy.inf): raise ValueError(f"{parameter} needs to be defined on the entire real line.") for interval in keys[1:]: if "lower_threshold" in parameter_dict[interval]: lower_thresholds[interval] = parameter_dict[interval]["lower_threshold"] elif "upper_threshold" in parameter_dict[interval - 1]: lower_thresholds[interval] = parameter_dict[interval - 1]["upper_threshold"] else: raise ValueError( f"In {interval} of {parameter} is no lower upper threshold or an upper" f" in the piece before." ) for interval in keys[:-1]: if "upper_threshold" in parameter_dict[interval]: upper_thresholds[interval] = parameter_dict[interval]["upper_threshold"] elif "lower_threshold" in parameter_dict[interval + 1]: upper_thresholds[interval] = parameter_dict[interval + 1]["lower_threshold"] else: raise ValueError( f"In {interval} of {parameter} is no upper threshold or a lower" f" threshold in the piece after." ) if not numpy.allclose(lower_thresholds[1:], upper_thresholds[:-1]): raise ValueError( f"The lower and upper thresholds of {parameter} have to coincide" ) thresholds = sorted([lower_thresholds[0], *upper_thresholds]) return lower_thresholds, upper_thresholds, thresholds def check_rates(parameter_dict, parameter, keys, func_type): """Check and transfer raw rates data. Transfer and check raw rates data, which needs to be specified in a piecewise_polynomial layout in the yaml file. Parameters ---------- parameter_dict parameter keys func_type Returns ------- """ options_dict = { "quadratic": { "necessary_keys": ["rate_linear", "rate_quadratic"], "rates_size": 2, }, "cubic": { "necessary_keys": ["rate_linear", "rate_quadratic", "rate_cubic"], "rates_size": 3, }, } # Allow for specification of rate with "rate" and "rate_linear" if func_type == "linear": rates = numpy.zeros((1, len(keys))) for interval in keys: if "rate" in parameter_dict[interval]: rates[0, interval] = parameter_dict[interval]["rate"] elif "rate_linear" in parameter_dict[interval]: rates[0, interval] = parameter_dict[interval]["rate_linear"] else: raise ValueError( f"In {interval} of {parameter} there is no rate specified." ) elif func_type in options_dict: rates = numpy.zeros((options_dict[func_type]["rates_size"], len(keys))) for i, rate_type in enumerate(options_dict[func_type]["necessary_keys"]): for interval in keys: if rate_type in parameter_dict[interval]: rates[i, interval] = parameter_dict[interval][rate_type] else: raise ValueError( f"In {interval} of {parameter} {rate_type} is missing." ) else: raise ValueError(f"Piecewise function {func_type} not specified.") return rates def check_intercepts( parameter_dict, parameter, lower_thresholds, upper_thresholds, rates, keys ): """Check and transfer raw intercepte data. If necessary create intercepts. Transfer and check raw rates data, which needs to be specified in a piecewise_polynomial layout in the yaml file. Parameters ---------- parameter_dict parameter lower_thresholds upper_thresholds rates keys Returns ------- """ intercepts = numpy.zeros(len(keys)) count_intercepts_supplied = 1 if "intercept_at_lower_threshold" not in parameter_dict[0]: raise ValueError(f"The first piece of {parameter} needs an intercept.") else: intercepts[0] = parameter_dict[0]["intercept_at_lower_threshold"] # Check if all intercepts are supplied. for interval in keys[1:]: if "intercept_at_lower_threshold" in parameter_dict[interval]: count_intercepts_supplied += 1 intercepts[interval] = parameter_dict[interval][ "intercept_at_lower_threshold" ] if (count_intercepts_supplied > 1) & (count_intercepts_supplied != len(keys)): raise ValueError( "More than one, but not all intercepts are supplied. " "The dictionaries should contain either only the lowest intercept " "or all intercepts." ) else: intercepts = create_intercepts( lower_thresholds, upper_thresholds, rates, intercepts[0] ) return intercepts def create_intercepts( lower_thresholds, upper_thresholds, rates, intercept_at_lowest_threshold ): """Create intercepts from raw data. Parameters ---------- lower_thresholds : numpy.array The lower thresholds defining the intervals upper_thresholds : numpy.array The upper thresholds defining the intervals rates : numpy.array The slope in the interval below the corresponding element of *upper_thresholds*. intercept_at_lowest_threshold : numpy.array Intercept at the lowest threshold fun: function handle (currently only piecewise_linear, will need to think about whether we can have a generic function with a different interface or make it specific ) Returns ------- """ intercepts_at_lower_thresholds = numpy.full_like(upper_thresholds, numpy.nan) intercepts_at_lower_thresholds[0] = intercept_at_lowest_threshold for i, up_thr in enumerate(upper_thresholds[:-1]): intercepts_at_lower_thresholds[i + 1] = calculate_intercepts( x=up_thr, lower_thresholds=lower_thresholds, upper_thresholds=upper_thresholds, rates=rates, intercepts_at_lower_thresholds=intercepts_at_lower_thresholds, ) return intercepts_at_lower_thresholds def calculate_intercepts( x, lower_thresholds, upper_thresholds, rates, intercepts_at_lower_thresholds ): """Calculate the intercepts from the raw data. Parameters ---------- x : float The value that the function is applied to. lower_thresholds : numpy.ndarray A one-dimensional array containing lower thresholds of each interval. upper_thresholds : numpy.ndarray A one-dimensional array containing upper thresholds each interval. rates : numpy.ndarray A two-dimensional array where columns are interval sections and rows correspond to the nth polynomial. intercepts_at_lower_thresholds : numpy.ndarray The intercepts at the lower threshold of each interval. Returns ------- out : float The value of `x` under the piecewise function. """ # Check if value lies within the defined range. if (x < lower_thresholds[0]) or (x > upper_thresholds[-1]) or numpy.isnan(x): return numpy.nan index_interval = numpy.searchsorted(upper_thresholds, x, side="left") intercept_interval = intercepts_at_lower_thresholds[index_interval] # Select threshold and calculate corresponding increment into interval lower_threshold_interval = lower_thresholds[index_interval] if lower_threshold_interval == -numpy.inf: return intercept_interval increment_to_calc = x - lower_threshold_interval out = intercept_interval for pol in range(1, rates.shape[0] + 1): out += rates[pol - 1, index_interval] * (increment_to_calc**pol) return out