Algorithme Forward-Backward - Exemple 3 - Itérations

par Joseph Razik, le 2019-10-18
Forward-Backward_exemple_3_iterations

Exemple donné dans le cours de traitement de la parole D32

HMM à densité continue, itération du système

In [1]:
%pylab inline
Populating the interactive namespace from numpy and matplotlib

Description du système:

Nombre d'observations: 21 possibles (entre 0 et 20, valeurs des observations possibles)

Nombre d'états: 3

Séquence d'bservations:

  • 11, 9, 10, 18, 19, 18, 14, 15, 14
In [2]:
# une suite d'observations
obs_1 = [11, 9, 10, 18, 19, 18, 14, 15, 14]

La différence entre les HMM à observation discrète et les HMM continus, est le fait que l'ensemble des observations n'est pas forcément limité à un ensemble de valeurs fixe et connu. On rencontre habituellement une modélisation par Gaussienne dans le cas continu. Ainsi, la probabilité d'une observersation est dorénavant donnée par sa loi de densité de probabilité:

$$N(\mu, \sigma^2) = \frac{1}{\sqrt{2\pi|\sigma|}} \exp^{-\frac{1}{2}\frac{(x-\mu)^2}{\sigma^2}} $$
In [3]:
# probabilités initiales des observations selons les états
def b_i_o(i, t):
    if mu_i is None:
        return 1.0
    else:
        return (1/sqrt(2*pi*abs(sigma2_i[i]))) * exp(-0.5*(obs[t]-mu_i[i])**2/sigma2_i[i])

Soit $\lambda$ le système initial défini par les probabilités d'observations $b_j(o_t)$, les probabilités de transition $a_{i,j}$ et les probabilités initiales $\pi_i$, $\alpha_t(i) = P(o_1o_2\cdots o_t, q_t=i|\lambda)$ et peut être calculé par récurrence tel que: $$\alpha_1(i) = \pi_i b_i(o_1)$$ $$\alpha_t(j) = \sum_{i=1}^N \alpha_{t-1}(i) a_{i,j} b_j(o_t) \, pour \, tout \, 1 < t \leq T$$

In [4]:
def calc_alpha(pi_i, a_ij, obs, N):
    """ Fonction qui calcul les alphas, sens forward, par récurence.
    (probabilité de la suite d'observations partielle se terminant à l'instant t et à l'état i)
    """
    # obs[t]-1 pour le décalage par rapport à la matrice
    T = len(obs)
    alpha = empty((T, N))
    # initialisation de la récurrence
    alpha[0, :] = [pi_i[i] * b_i_o(i, 0) for i in range(N)]
    # on avance petit à petit l'évaluation
    for t in range(1, T):
        alpha[t, :] = [b_i_o(i, t) * sum([a_ij[j, i] * alpha[t-1, j] for j in range(N)]) for i in range(N)]
    return alpha

Soit $\lambda$ le système initial défini par les probabilités d'observations $b_j(o_t)$, les probabilités de transition $a_{i,j}$ et les probabilités initiales $\pi_i$, $\beta_t(i) = P(o_{t+1}o_{t+2}\cdots o_T, q_t=i|\lambda)$ et peut être calculé par récurrence tel que: $$ \beta_T(i) = 1$$ $$ \beta_t(i) = \sum_{j=1}^N \beta_{t+1}(j) a_{i,j} b_j(o_{t+1}) \, pour \, tout \, 0 \leq t < T$$

In [5]:
def calc_beta(a_ij, obs, N):
    """ Fonction qui calcul les betas, sens backward, par récurence.
    (probabilité d'observer la suite partielle ot+1ot+2...o_T, de t+1 à T est dans l'état i à l'instant t)
    """
    # obs[t]-1 pour le décalage par rapport à la matrice
    T = len(obs)
    beta = empty((T, N))
    beta[T-1, :] = 1.0  # valeur d'nitialisation arbitraire mais égale pour tous les états (cf rabiner89)
    for t in range(T - 2, -1, -1):
        beta[t, :] = [sum([beta[t+1, j] * a_ij[i, j] * b_i_o(j, t+1) for j in range(N)]) for i in range(N)]
    return beta

Pour verifier que tout s'est bien passé on peut utiliser le fait que

$$P(O/\lambda) = \sum_{i=1}^N \alpha_T(i) = \sum_{i=1}^N \beta_1(i) = \sum_{i=1}^N \alpha_t(i) \beta_t(i) \,\, \forall t$$

Une fois les coefficients $\alpha_t(i)$ et $\beta_t(i)$ calculés, il est possible d'estimer les coefficients $\gamma_t(i)$, la probabilité d'être dans l'état $i$ à l'instant $t$ sachant les observations et le modèle $\lambda$.

$$\gamma_t(i) = \frac{\alpha_t(i) \beta_t(i)}{P(O/\lambda)}$$
In [6]:
def calc_gamma(alpha, beta, p_o_lambda, T, N):
    """ Fonction qui calcule les valeurs gamma en fonction des alphas et betas. """
    gamma = empty((T, N))
    for t in range(T):
        gamma[t, :] = (alpha[t, :]*beta[t, :]) / p_o_lambda[t]  
    return gamma
In [7]:
def calc_xi(alpha, beta, p_o_lambda, a_ij, obs, N):
    """ Fonction qui calcule les valeurs zeta à partir des alphas et des betas."""
    T = len(obs)
    xi = empty((T-1, N, N))
    for t in range(T-1):
        for i in range(N):
            for j in range(N):
                xi[t, i, j] = alpha[t, i] * a_ij[i, j] * beta[t+1, j] * b_i_o(j, t+1) / p_o_lambda[t]
            # -1 pour faire correspondre avec la matrice d'observation, les valeurs commençant à 1 et non à 0 (nb glaces)
    return xi
        

Les nouvelles probabilités d'observations dépendent des paramètres définissant le modèle gaussien: les moyennes et l'écart-type qui peuvent être calculées par la formule suivante:

$$\mu_i = \frac{\sum_{t=1}^T \gamma_t(i) o_t}{\sum_{t=1}^T \gamma_t(i)}$$$$\sigma^2_i = \frac{\sum_{t=1}^T \gamma_t(i) (o_t - \mu_i) (o_t - \mu_i)'}{\sum_{t=1}^T \gamma_t(i)} $$

Bien remarquer maintenant dans l'équation de ré-estimation de $\mu_i$ la multiplication par l'apparition de la valeur elle-même. Dans le cas discret, il y avait uniquement observation ou non observation (0 ou 1).

Dans l'équation de $\sigma^2$, le caractère "'" signifie "transposé" (dans le cas multidimensionnel).

In [8]:
def iteration(nb_iter, obs):
    global mu_i, sigma_i
    a_ij = array(
    [[0.5, 0.5, 0.0],
     [0.0, 0.5, 0.5],
     [0.0, 0.0, 1.0],
     ])
    pi_i = [1.0, 0.0, 0.0]
    mu_i = None  # [0.0, 0.0, 0.0]
    sigma2_i = None  # [1.0, 1.0, 1.0]
    N = 3
    T = len(obs)
    obs = array(obs)
    for iterate in range(nb_iter):
        alpha = calc_alpha(pi_i, a_ij, obs, N)
        beta = calc_beta(a_ij, obs, N)
        p_o_lambda = sum(alpha*beta, axis=1)
        gamma = calc_gamma(alpha, beta, p_o_lambda, T, N)
        xi = calc_xi(alpha, beta, p_o_lambda, a_ij, obs, N)
        # nouvelles estimations
        new_pi = gamma[0, :]
        new_a_ij = array([[sum(xi[:, i, j])/sum(gamma[:T-1, i]) for j in range(N)] for i in range(N)])
        new_mu_i = [ sum(gamma[:, i]*obs[:]) / sum(gamma[:, i]) for i in range(N)]
        new_sigma2_i = [ sum(gamma[:, i] * (obs[:] - new_mu_i[i])**2) / sum(gamma[:, i]) for i in range(N)]
    print('new_pi: ', new_pi)
    print('new_a_ij', new_a_ij)
    print('new_b_i_o', new_mu_i, new_sigma2_i)
In [9]:
iteration(1, obs_1)
new_pi:  [ 1.  0.  0.]
new_a_ij [[ 0.5  0.5  0. ]
 [ 0.   0.5  0.5]
 [ 0.   0.   1. ]]
new_b_i_o [11.217221135029353, 13.454183266932271, 15.710302091402014] [7.0702318082421556, 17.706068157648289, 5.6263772131559664]
In [10]:
iteration(5, obs_1)
new_pi:  [ 1.  0.  0.]
new_a_ij [[ 0.5  0.5  0. ]
 [ 0.   0.5  0.5]
 [ 0.   0.   1. ]]
new_b_i_o [11.217221135029353, 13.454183266932271, 15.710302091402014] [7.0702318082421556, 17.706068157648289, 5.6263772131559664]

Avec deux autres séquences d'observations

  • 10, 11, 19, 17, 19, 19, 13, 14
  • 9, 17, 18, 17, 17, 13, 15
In [11]:
obs_2 = [10, 11, 19, 17, 19, 19, 13, 14]
obs_3 = [9, 17, 18, 17, 17, 13, 15]
In [12]:
iteration(1, obs_2)
new_pi:  [ 1.  0.  0.]
new_a_ij [[ 0.5  0.5  0. ]
 [ 0.   0.5  0.5]
 [ 0.   0.   1. ]]
new_b_i_o [12.282352941176471, 16.105263157894736, 16.295019157088124] [12.226159169550172, 11.438312380140633, 6.7826954977172971]
In [13]:
iteration(1, obs_3)
new_pi:  [ 1.  0.  0.]
new_a_ij [[ 0.5  0.5  0. ]
 [ 0.   0.5  0.5]
 [ 0.   0.   1. ]]
new_b_i_o [13.015748031496063, 16.833333333333332, 15.477611940298507] [16.787153574307144, 1.7722222222222219, 3.0355684265240956]