"""Implements some useful functions
:author: Michel Bierlaire
:date: Sun Apr 14 10:46:10 2019
"""
# Too constraining
# pylint: disable=invalid-name, too-many-locals
import itertools
from collections import namedtuple
import numpy as np
import pandas as pd
from scipy.stats import chi2
from scipy.integrate import dblquad
import biogeme.messaging as msg
import biogeme.exceptions as excep
from biogeme.expressions import Expression
logger = msg.bioMessage()
LRTuple = namedtuple('LRTuple', 'message statistic threshold')
[docs]def findiff_g(theFunction, x):
    """Calculates the gradient of a function :math:`f` using finite differences
    :param theFunction: A function object that takes a vector as an
                        argument, and returns a tuple. The first
                        element of the tuple is the value of the
                        function :math:`f`. The other elements are not
                        used.
    :type theFunction: function
    :param x: argument of the function
    :type x: numpy.array
    :return: numpy vector, same dimension as x, containing the gradient
       calculated by finite differences.
    :rtype: numpy.array
    """
    x = x.astype(float)
    tau = 0.0000001
    n = len(x)
    g = np.zeros(n)
    f = theFunction(x)[0]
    for i in range(n):
        xi = x.item(i)
        xp = x.copy()
        if abs(xi) >= 1:
            s = tau * xi
        elif xi >= 0:
            s = tau
        else:
            s = -tau
        xp[i] = xi + s
        fp = theFunction(xp)[0]
        g[i] = (fp - f) / s
    return g 
[docs]def findiff_H(theFunction, x):
    """Calculates the hessian of a function :math:`f` using finite differences
    :param theFunction: A function object that takes a vector as an
                        argument, and returns a tuple. The first
                        element of the tuple is the value of the
                        function :math:`f`, and the second is the
                        gradient of the function.  The other elements
                        are not used.
    :type theFunction: function
    :param x: argument of the function
    :type x: numpy.array
    :return: numpy matrix containing the hessian calculated by
             finite differences.
    :rtype: numpy.array
    """
    tau = 1.0e-7
    n = len(x)
    H = np.zeros((n, n))
    g = theFunction(x)[1]
    eye = np.eye(n, n)
    for i in range(n):
        xi = x.item(i)
        if abs(xi) >= 1:
            s = tau * xi
        elif xi >= 0:
            s = tau
        else:
            s = -tau
        ei = eye[i]
        gp = theFunction(x + s * ei)[1]
        H[:, i] = (gp - g).flatten() / s
    return H 
[docs]def checkDerivatives(theFunction, x, names=None, logg=False):
    """Verifies the analytical derivatives of a function by comparing
    them with finite difference approximations.
    :param theFunction: A function object that takes a vector as an argument,
        and returns a tuple:
        - The first element of the tuple is the value of the
          function :math:`f`,
        - the second is the gradient of the function,
        - the third is the hessian.
    :type theFunction: function
    :param x: arguments of the function
    :type x: numpy.array
    :param names: the names of the entries of x (for reporting).
    :type names: list(string)
    :param logg: if True, messages will be displayed.
    :type logg: bool
    :return: tuple f, g, h, gdiff, hdiff where
          - f is the value of the function at x,
          - g is the analytical gradient,
          - h is the analytical hessian,
          - gdiff is the difference between the analytical gradient
            and the finite difference approximation
          - hdiff is the difference between the analytical hessian
            and the finite difference approximation
    :rtype: float, numpy.array,numpy.array,  numpy.array,numpy.array
    """
    x = np.array(x, dtype=float)
    f, g, h = theFunction(x)
    g_num = findiff_g(theFunction, x)
    gdiff = g - g_num
    if logg:
        if names is None:
            names = [f'x[{i}]' for i in range(len(x))]
        logger.detailed('x\t\tGradient\tFinDiff\t\tDifference')
        for k, v in enumerate(gdiff):
            logger.detailed(f'{names[k]:15}\t{g[k]:+E}\t{g_num[k]:+E}\t{v:+E}')
    h_num = findiff_H(theFunction, x)
    hdiff = h - h_num
    if logg:
        logger.detailed('Row\t\tCol\t\tHessian\tFinDiff\t\tDifference')
        for row in range(len(hdiff)):
            for col in range(len(hdiff)):
                logger.detailed(
                    f'{names[row]:15}\t{names[col]:15}\t{h[row,col]:+E}\t'
                    f'{h_num[row,col]:+E}\t{hdiff[row,col]:+E}'
                )
    return f, g, h, gdiff, hdiff 
[docs]def countNumberOfGroups(df, column):
    """
    This function counts the number of groups of same value in a column.
    For instance: 1,2,2,3,3,3,4,1,1  would give 5.
    Example::
        >>>df = pd.DataFrame({'ID': [1, 1, 2, 3, 3, 1, 2, 3],
                              'value':[1000,
                                       2000,
                                       3000,
                                       4000,
                                       5000,
                                       5000,
                                       10000,
                                       20000]})
        >>>tools.countNumberOfGroups(df,'ID')
        6
        >>>tools.countNumberOfGroups(df,'value')
        7
    """
    df['_biogroups'] = (df[column] != df[column].shift(1)).cumsum()
    result = len(df['_biogroups'].unique())
    df.drop(columns=['_biogroups'], inplace=True)
    return result 
[docs]def likelihood_ratio_test(model1, model2, significance_level=0.05):
    """This function performs a likelihood ratio test between a
    restricted and an unrestricted model.
    :param model1: the final loglikelihood of one model, and the number of
                   estimated parameters.
    :type model1: tuple(float, int)
    :param model2: the final loglikelihood of the other model, and
                   the number of estimated parameters.
    :type model2: tuple(float, int)
    :param significance_level: level of significance of the test. Default: 0.05
    :type significance_level: float
    :return: a tuple containing:
                  - a message with the outcome of the test
                  - the statistic, that is minus two times the difference
                    between the loglikelihood  of the two models
                  - the threshold of the chi square distribution.
    :rtype: LRTuple(str, float, float)
    :raise biogemeError: if the unrestricted model has a lower log
        likelihood than the restricted model.
    """
    loglike_m1, df_m1 = model1
    loglike_m2, df_m2 = model2
    if loglike_m1 > loglike_m2:
        if df_m1 < df_m2:
            raise excep.biogemeError(
                f'The unrestricted model {model2} has a '
                f'lower log likelihood than the restricted one {model1}'
            )
        loglike_ur = loglike_m1
        loglike_r = loglike_m2
        df_ur = df_m1
        df_r = df_m2
    else:
        if df_m1 >= df_m2:
            raise excep.biogemeError(
                f'The unrestricted model {model1} has a '
                f'lower log likelihood than the restricted one {model2}'
            )
        loglike_ur = loglike_m2
        loglike_r = loglike_m1
        df_ur = df_m2
        df_r = df_m1
    stat = -2 * (loglike_r - loglike_ur)
    chi_df = df_ur - df_r
    threshold = chi2.ppf(1 - significance_level, chi_df)
    if stat <= threshold:
        final_msg = (
            f'H0 cannot be rejected at level {100*significance_level:.1f}%'
        )
    else:
        final_msg = (
            f'H0 can be rejected at level {100*significance_level:.1f}%'
        )
    return LRTuple(message=final_msg, statistic=stat, threshold=threshold) 
[docs]def flatten_database(df, merge_id, row_name=None, identical_columns=None):
    """Combine several rows of a Pandas database into one.
    For instance, consider the following database::
           ID  Age  Cost   Name
        0   1   23    34  Item3
        1   1   23    45  Item4
        2   1   23    12  Item7
        3   2   45    65  Item3
        4   2   45    34  Item7
    If row_name is 'Name', the function generates the same data in the
    following format::
            Age  Item3_Cost  Item4_Cost  Item7_Cost
        ID
        1    23          34        45.0          12
        2    45          65         NaN          34
    If row_name is None, the function generates the same data in the
    following format::
            Age  1_Cost 1_Name  2_Cost 2_Name  3_Cost 3_Name
        ID
        1    23      34  Item3      45  Item4    12.0  Item7
        2    45      65  Item3      34  Item7     NaN    NaN
    :param df: initial data frame
    :type df: pandas.DataFrame
    :param merge_id: name of the column that identifies rows that
        should be merged. In the above example: 'ID'
    :type merge_id: str
    :param row_name: name of the columns that provides the name of the
        rows in the new dataframe. In the example above: 'Name'. If
        None, the rows are numbered sequentially.
    :type row_name: str
    :param identical_columns: name of the columns that contain
        identical values across the rows of a group. In the example
        above: ['Age']. If None, these columns are automatically
        detected. On large database, there may be a performance issue.
    :type identical_columns: list(str)
    :return: reformatted database
    :rtype: pandas.DataFrame
    """
    grouped = df.groupby(by=merge_id)
    all_columns = set(df.columns)
    def are_values_identical(col):
        """This function checks if all the values in a column
        are identical
        :param col: the column
        :type col: pandas.Series
        :return: True if all values are identical. False otherwise.
        :rtype: bool
        """
        return (col.iloc[0] == col).all(0)
    def get_varying_cols(g):
        """This functions returns the name of all columns
        that have constant values within each group of data.
        :param g: group of data
        :type g: pandas.DataFrame
        :return: name of all columns that have constant values
            within each group of data.
        :rtype: set(str)
        """
        return {
            colname
            for colname, col in g.iteritems()
            if not are_values_identical(col)
        }
    if identical_columns is None:
        all_varying_cols = grouped.apply(get_varying_cols)
        varying_columns = set.union(*all_varying_cols)
        identical_columns = list(all_columns - varying_columns)
        varying_columns = list(varying_columns)
    else:
        identical_columns = set(identical_columns)
        identical_columns.add(merge_id)
        varying_columns = list(all_columns - identical_columns)
    # Take the first row for columns that are identical
    if identical_columns:
        common_data = df[list(identical_columns)].drop_duplicates(
            merge_id, keep='first'
        )
        common_data.index = common_data[merge_id]
    # Treat the other columns
    grouped_varying = df[[merge_id] + list(varying_columns)].groupby(
        by=merge_id
    )
    def treat(x):
        """Treat a group of data.
        :param x: group of data
        :type x: pandas.DataFrame
        :return: the same data organized in one row, with proper column names
        :rtype: pandas.DataFrame
        :raise biogemeError:  if there are duplicates in the name of
        the row. Indeed, in that case, they cannot be used to name the
        new columns.
        """
        if not are_values_identical(x[merge_id]):
            err_msg = (
                f'Group has different IDs: {x[merge_id]}. '
                f'Rows id: {x.index}'
            )
            raise excep.biogemeError(err_msg)
        if row_name is not None and not x[row_name].is_unique:
            err_msg = (
                f'Entries in column [{row_name}] are not unique. '
                f'This column cannot be used to name the new '
                f'columns:\n{x[[row_name, merge_id]]}. '
            )
            raise excep.biogemeError(err_msg)
        the_columns = set(x.columns) - {merge_id}
        if row_name is not None:
            the_columns -= {row_name}
        sorted_list = sorted(list(the_columns))
        first = True
        i = 0
        for _, row in x.iterrows():
            i += 1
            if first:
                all_values = [row[merge_id]]
                all_columns = [merge_id]
                first = False
            name = f'{i}' if row_name is None else row[row_name]
            columns = [f'{name}_{c}' for c in sorted_list]
            all_values.extend([row[c] for c in sorted_list])
            all_columns.extend(columns)
        df = pd.DataFrame([all_values], columns=all_columns)
        return df
    flat_data = grouped_varying.apply(treat)
    flat_data.index = flat_data[merge_id]
    # We remove the column 'merge_id' as it is stored as index.
    if identical_columns:
        return pd.concat([common_data, flat_data], axis='columns').drop(
            columns=[merge_id]
        )
    return flat_data.drop(columns=[merge_id]) 
[docs]def covariance_cross_nested(i, j, nests):
    """Calculate the covariance between the error terms of two
    alternatives of a cross-nested logit model. It is assumed that
    the homogeneity parameter mu of the model has been normalized
    to one.
    :param i: first alternative
    :type i: int
    :param j: first alternative
    :type j: int
    :param nests: a tuple containing as many items as nests.
        Each item is also a tuple containing two items:
        - an object of type biogeme.expressions. expr.Expression
          representing the nest parameter,
        - a dictionary mapping the alternative ids with the cross-nested
          parameters for the corresponding nest. If an alternative is
          missing in the dictionary, the corresponding alpha is set to zero.
        Example::
            alphaA = {1: alpha1a,
                      2: alpha2a,
                      3: alpha3a,
                      4: alpha4a,
                      5: alpha5a,
                      6: alpha6a}
            alphaB = {1: alpha1b,
                      2: alpha2b,
                      3: alpha3b,
                      4: alpha4b,
                      5: alpha5b,
                      6: alpha6b}
            nesta = MUA, alphaA
            nestb = MUB, alphaB
            nests = nesta, nestb
    :type nests: tuple
    :return: value of the correlation
    :rtype: float
    :raise biogemeError: if the requested number is non positive or a float
    """
    set_of_alternatives = {alt for m in nests for alt in m[1]}
    if i not in set_of_alternatives:
        raise excep.biogemeError(f'Unknown alternative: {i}')
    if j not in set_of_alternatives:
        raise excep.biogemeError(f'Unknown alternative: {j}')
    if i == j:
        return np.pi * np.pi / 6.0
    def integrand(z_i, z_j):
        """Function to be integrated to calculate the correlation between
        alternative i and alternative j.
        :param z_i: argument corresponding to alternative i
        :type z_i: float
        :param z_j: argument corresponding to alternative j
        :type z_j: float
        """
        y_i = -np.log(z_i)
        y_j = -np.log(z_j)
        xi_i = -np.log(y_i)
        xi_j = -np.log(y_j)
        dy_i = -1 / z_i
        dy_j = -1 / z_j
        dxi_i = -dy_i / y_i
        dxi_j = -dy_j / y_j
        G_sum = 0.0
        Gi_sum = 0.0
        Gj_sum = 0.0
        Gij_sum = 0.0
        for m in nests:
            mu_m = m[0]
            alphas = m[1]
            alpha_i = alphas.get(i, 0)
            if alpha_i != 0:
                term_i = (alpha_i * y_i) ** mu_m
            else:
                term_i = 0
            alpha_j = alphas.get(j, 0)
            if alpha_j != 0:
                term_j = (alpha_j * y_j) ** mu_m
            else:
                term_j = 0
            the_sum = term_i + term_j
            p1 = (1.0 / mu_m) - 1
            p2 = (1.0 / mu_m) - 2
            G_sum += the_sum ** (1.0 / mu_m)
            if alpha_i != 0:
                Gi_sum += alpha_i**mu_m * y_i ** (mu_m - 1) * the_sum**p1
            if alpha_j != 0:
                Gj_sum += alpha_j**mu_m * y_j ** (mu_m - 1) * the_sum**p1
            if mu_m != 1.0 and alpha_i != 0 and alpha_j != 0:
                Gij_sum += (
                    (1 - mu_m)
                    * the_sum**p2
                    * (alpha_i * alpha_j) ** mu_m
                    * (y_i * y_j) ** (mu_m - 1)
                )
        F = np.exp(-G_sum)
        F_second = F * y_i * y_j * (Gi_sum * Gj_sum - Gij_sum)
        return xi_i * xi_j * F_second * dxi_i * dxi_j
    integral, _ = dblquad(integrand, 0, 1, lambda x: 0, lambda x: 1)
    return integral - np.euler_gamma * np.euler_gamma 
[docs]def correlation_nested(nests):
    """Calculate the correlation matrix of the error terms of all
    alternatives of a nested logit model. It is assumed that the
    homogeneity parameter mu of the model has been normalized to one.
    :param nests: A tuple containing as many items as nests.
        Each item is also a tuple containing two items:
        - an object of type biogeme.expressions.expr.Expression representing
          the nest parameter,
        - a list containing the list of identifiers of the alternatives
          belonging to the nest.
        Example::
            nesta = MUA ,[1, 2, 3]
            nestb = MUB ,[4, 5, 6]
            nests = nesta, nestb
    :type nests: tuple
    :return: correlation matrix
    :rtype: pd.DataFrame
    """
    set_of_alternatives = {alt for m in nests for alt in m[1]}
    list_of_alternatives = sorted(set_of_alternatives)
    index = {alt: i for i, alt in enumerate(list_of_alternatives)}
    J = len(list_of_alternatives)
    correlation = np.identity(J)
    for m in nests:
        mu_m = m[0]
        alt_m = m[1]
        for i, j in itertools.combinations(alt_m, 2):
            correlation[index[i]][index[j]] = correlation[index[j]][
                index[i]
            ] = 1.0 - 1.0 / (mu_m * mu_m)
    return pd.DataFrame(
        correlation, index=list_of_alternatives, columns=list_of_alternatives
    ) 
[docs]def correlation_cross_nested(nests):
    """Calculate the correlation matrix of the error terms of all
    alternatives of a cross-nested logit model. It is assumed that
    the homogeneity parameter mu of the model has been normalized
    to one.
    :param nests: a tuple containing as many items as nests.
        Each item is also a tuple containing two items:
        - an object of type biogeme.expressions. expr.Expression
          representing the nest parameter,
        - a dictionary mapping the alternative ids with the cross-nested
          parameters for the corresponding nest. If an alternative is
          missing in the dictionary, the corresponding alpha is set to zero.
        Example::
            alphaA = {1: alpha1a,
                      2: alpha2a,
                      3: alpha3a,
                      4: alpha4a,
                      5: alpha5a,
                      6: alpha6a}
            alphaB = {1: alpha1b,
                      2: alpha2b,
                      3: alpha3b,
                      4: alpha4b,
                      5: alpha5b,
                      6: alpha6b}
            nesta = MUA, alphaA
            nestb = MUB, alphaB
            nests = nesta, nestb
    :type nests: tuple
    :return: value of the correlation
    :rtype: float
    :raise biogemeError: if the requested number is non positive or a float
    :return: correlation matrix
    :rtype: pd.DataFrame
    """
    set_of_alternatives = {alt for m in nests for alt in m[1]}
    list_of_alternatives = sorted(set_of_alternatives)
    J = len(list_of_alternatives)
    covar = np.empty((J, J))
    for i, alt_i in enumerate(list_of_alternatives):
        for j, alt_j in enumerate(list_of_alternatives):
            covar[i][j] = covariance_cross_nested(alt_i, alt_j, nests)
            if i != j:
                covar[j][i] = covar[i][j]
    v = np.sqrt(np.diag(covar))
    outer_v = np.outer(v, v)
    correlation = covar / outer_v
    correlation[covar == 0] = 0
    return pd.DataFrame(
        correlation, index=list_of_alternatives, columns=list_of_alternatives
    ) 
[docs]def calculate_correlation(nests, results, alternative_names=None):
    """Calculate the correlation matrix of a nested or cross-nested
    logit model.
    :param nests:  A tuple containing as many items as nests.
        Each item is also a tuple containing two items:
        - an object of type biogeme.expressions. expr.Expression
          representing the nest parameter,
        - for the nested logit model, a list containing the list of
          identifiers of the alternatives belonging to the nest.
        - for the cross-nested logit model, a dictionary mapping the
          alternative ids with the cross-nested parameters for the
          corresponding nest. If an alternative is missing in the
          dictionary, the corresponding alpha is set to zero.
        Example for the nested logit::
            nesta = MUA ,[1, 2, 3]
            nestb = MUB ,[4, 5, 6]
            nests = nesta, nestb
        Example for the cross-nested logit::
            alphaA = {1: alpha1a,
                      2: alpha2a,
                      3: alpha3a,
                      4: alpha4a,
                      5: alpha5a,
                      6: alpha6a}
            alphaB = {1: alpha1b,
                      2: alpha2b,
                      3: alpha3b,
                      4: alpha4b,
                      5: alpha5b,
                      6: alpha6b}
            nesta = MUA, alphaA
            nestb = MUB, alphaB
            nests = nesta, nestb
    :type nests: tuple(tuple(biogeme.expressions.Expression, list(int))), or
                 tuple(tuple(biogeme.Expression, dict(int:biogeme.expressions.Expression)))
    :param results: estimation results
    :type results: biogeme.results.bioResults
    :param alternative_names: a dictionary mapping the alternative IDs
        with their name. If None, the IDs are used as names.
    :type alternative_names: dict(int: str)
    """
    betas = results.getBetaValues()
    cnl = isinstance(nests[0][1], dict)
    def get_estimated_expression(expr):
        """Returns the estimated value of the nest parameter.
        :param expr: expression to calculate
        :type expr: biogeme.expressions.Expression or float.
        :return: calculated value
        :rtype: float
        :raise biogemeError: if the input value is not an expression
            or a float.
        """
        if isinstance(expr, Expression):
            expr.changeInitValues(betas)
            return expr.getValue_c(prepareIds=True)
        if isinstance(expr, (int, float)):
            return expr
        raise excep.biogemeError(f'Invalid type: {type(expr)}')
    def numerical_tuple(the_tuple):
        mu_m = get_estimated_expression(the_tuple[0])
        alpha_m = the_tuple[1]
        if isinstance(alpha_m, dict):
            # Cross-nested logit
            estimated_alpha_m = {
                alt
                if alternative_names is None
                else alternative_names[alt]: get_estimated_expression(e)
                for alt, e in alpha_m.items()
            }
            return mu_m, estimated_alpha_m
        return mu_m, [
            alt if alternative_names is None else alternative_names[alt]
            for alt in alpha_m
        ]
    estimated_nests = tuple(numerical_tuple(m) for m in nests)
    if cnl:
        return correlation_cross_nested(estimated_nests)
    return correlation_nested(estimated_nests)