Algorithme Forward-Backward - Exemple 3 - Séquences d'observations

par Joseph Razik, le 2019-10-18
Forward-Backward_exemple_3_iterations_observations

Exemple donné dans le cours de traitement de la parole D32

HMM à densité continue, plusieurs séquences d'observations

In [1]:
%pylab inline
Populating the interactive namespace from numpy and matplotlib

Description du système:

Nombre d'observations: 21 possibles (entre 0 et 20, valeurs des observations possibles)

Nombre d'états: 3

Séquences d'bservations:

  • 11, 9, 10, 18, 19, 18, 14, 15, 14
  • 10, 11, 19, 17, 19, 19, 13, 14
  • 9, 17, 18, 17, 17, 13, 15

Nous avons donc 3 séquences d'observations différentes.

In [2]:
# une suite d'observations
obs_1 = [11, 9, 10, 18, 19, 18, 14, 15, 14]
obs_2 = [10, 11, 19, 17, 19, 19, 13, 14]
obs_3 = [9, 17, 18, 17, 17, 13, 15]

La différence entre les HMM à observation discrète et les HMM continus, est le fait que l'ensemble des observations n'est pas forcément limité à un ensemble de valeurs fixe et connu. On rencontre habituellement une modélisation par Gaussienne dans le cas continu. Ainsi, la probabilité d'une observersation est dorénavant donnée par sa loi de densité de probabilité:

$$N(\mu, \sigma^2) = \frac{1}{\sqrt{2\pi|\sigma|}} \exp^{-\frac{1}{2}\frac{(x-\mu)^2}{\sigma^2}} $$
In [3]:
# probabilités initiales des observations selons les états
def b_i_o(mu_i, sigma2_i, i, t, obs):
    if mu_i is None:
        return 1.0
    else:
        return (1/sqrt(2*pi*abs(sigma2_i[i]))) * exp(-0.5 * (obs[t] - mu_i[i])**2 / sigma2_i[i])

Soit $\lambda$ le système initial défini par les probabilités d'observations $b_j(o_t)$, les probabilités de transition $a_{i,j}$ et les probabilités initiales $\pi_i$, $\alpha_t(i) = P(o_1o_2\cdots o_t, q_t=i|\lambda)$ et peut être calculé par récurrence tel que: $$\alpha_1(i) = \pi_i b_i(o_1)$$ $$\alpha_t(j) = \sum_{i=1}^N \alpha_{t-1}(i) a_{i,j} b_j(o_t) \, pour \, tout \, 1 < t \leq T$$

In [4]:
def calc_alpha(pi_i, a_ij, mu_i, sigma_i, obs, N):
    """ Fonction qui calcul les alphas, sens forward, par récurence.
    (probabilité de la suite d'observations partielle se terminant à l'instant t et à l'état i)
    """
    # obs[t]-1 pour le décalage par rapport à la matrice
    T = len(obs)
    alpha = empty((T, N))
    # initialisation de la récurrence
    alpha[0, :] = [pi_i[i] * b_i_o(mu_i, sigma_i, i, 0, obs) for i in range(N)]
    # on avance petit à petit l'évaluation
    for t in range(1, T):
        alpha[t, :] = [b_i_o(mu_i, sigma_i, i, t, obs) * 
                       sum([a_ij[j, i] * alpha[t-1, j] for j in range(N)])
                       for i in range(N)]
    return alpha

Soit $\lambda$ le système initial défini par les probabilités d'observations $b_j(o_t)$, les probabilités de transition $a_{i,j}$ et les probabilités initiales $\pi_i$, $\beta_t(i) = P(o_{t+1}o_{t+2}\cdots o_T, q_t=i|\lambda)$ et peut être calculé par récurrence tel que: $$ \beta_T(i) = 1$$ $$ \beta_t(i) = \sum_{j=1}^N \beta_{t+1}(j) a_{i,j} b_j(o_{t+1}) \, pour \, tout \, 0 \leq t < T$$

In [5]:
def calc_beta(a_ij, mu_i, sigma_i, obs, N):
    """ Fonction qui calcul les betas, sens backward, par récurence.
    (probabilité d'observer la suite partielle ot+1ot+2...o_T, de t+1 à T est dans l'état i à l'instant t)
    """
    # obs[t]-1 pour le décalage par rapport à la matrice
    T = len(obs)
    beta = empty((T, N))
    beta[T-1, :] = 1.0  # valeur d'nitialisation arbitraire mais égale pour tous les états (cf rabiner89)
    for t in range(T - 2, -1, -1):
        beta[t, :] = [sum([beta[t+1, j] * a_ij[i, j] * b_i_o(mu_i, sigma_i, j, t+1, obs) for j in range(N)])
                      for i in range(N)]
    return beta

Pour verifier que tout s'est bien passé on peut utiliser le fait que

$$P(O/\lambda) = \sum_{i=1}^N \alpha_T(i) = \sum_{i=1}^N \beta_1(i) = \sum_{i=1}^N \alpha_t(i) \beta_t(i) \,\, \forall t$$

Une fois les coefficients $\alpha_t(i)$ et $\beta_t(i)$ calculés, il est possible d'estimer les coefficients $\gamma_t(i)$, la probabilité d'être dans l'état $i$ à l'instant $t$ sachant les observations et le modèle $\lambda$.

$$\gamma_t(i) = \frac{\alpha_t(i) \beta_t(i)}{P(O/\lambda)}$$
In [6]:
def calc_gamma(alpha, beta, p_o_lambda, T, N):
    """ Fonction qui calcule les valeurs gamma en fonction des alphas et betas. """
    gamma = empty((T, N))
    for t in range(T):
        gamma[t, :] = (alpha[t, :]*beta[t, :]) / p_o_lambda[t]  
    return gamma
In [7]:
def calc_xi(alpha, beta, p_o_lambda, a_ij, mu_i, sigma_i, obs, N):
    """ Fonction qui calcule les valeurs zeta à partir des alphas et des betas."""
    T = len(obs)
    xi = empty((T-1, N, N))
    for t in range(T-1):
        for i in range(N):
            for j in range(N):
                xi[t, i, j] = alpha[t, i] * a_ij[i, j] * beta[t+1, j] * b_i_o(mu_i, sigma_i, j, t+1, obs) / p_o_lambda[t]
            # -1 pour faire correspondre avec la matrice d'observation, les valeurs commençant à 1 et non à 0 (nb glaces)
    return xi
        

Dans le cas de plusieurs séquence d'observations, il "suffit" de généraliser sur l'ensemble des séquences car $P(O|\lambda) = \prod_{k=1}^K P(O^k|\lambda) = \sum_{k=1}^K P_k$.

Les formules de ré-estimation deviennent alors

$$a_{i,j} = \frac{\sum_{k=1}^K \frac{1}{P_k} \sum_{t=1}^{T_k-1} \alpha_t^k(i) a_{i,j} b_j(O^k_{t+1}) \beta_{t+1}^k(j)}{\sum_{k=1}^K \frac{1}{P_k} \sum_{t=1}^{T_k-1} \alpha_t^k(i) \beta_t^k(j)}$$

En utilisant les variables $\gamma$ et $\xi$ habituelles:

$$\gamma_t^k(i) = \sum_{j=1}^N \xi_t^k(i, j)$$$$\xi_t^k(i, j) = \frac{\sum_{t=1}^{T_k-1} \alpha_t^k(i) a_{i,j} b_j(O^k_{t+1}) \beta_{t+1}^k(j) }{P_k}$$

L'écriture des ré-estimations peuvent se simplfier ainsi:

$$a_{i,j} = \frac{\sum_{k=1}^K \sum_{t=1}^{T_k-1} \xi_t^k(i, j)}{\sum_{k=1}^K \sum_{t=1}^{T_k-1} \gamma_t^k(i)}$$$$\mu_i = \frac{\sum_{k=1}^K \sum_{t=1}^{T_k} \gamma_t^k(i) o^k_t}{\sum_{k=1}^K \sum_{t=1}^{T_k} \gamma_t^k(i)}$$$$\sigma^2_i = \frac{\sum_{k=1}^K \sum_{t=1}^{T_k} \gamma_t^k(i) (o_t^k - \mu_i) (o_t^k - \mu_i)'}{\sum_{k=1}^K \sum_{t=1}^{T_k} \gamma_t^k(i)} $$

Il nous faut donc calculer et conserver pour chaque séquence $k$ les valeurs de $\gamma_t^k(i)$ et $\xi_t^k(i,j)$ et les recombiner à la fin pour estimer les paramètres du système selon toutes les observations.

In [8]:
def iteration(nb_iter, sequences):
    a_ij = array(
    [[0.5, 0.5, 0.0],
     [0.0, 0.5, 0.5],
     [0.0, 0.0, 0.5],
     ])
    pi_i = [1.0, 0.0, 0.0]
    mu_i = None
    sigma2_i = None
    N = 3
    
    for iterate in range(nb_iter):
        gamma = list()
        xi = list()
        for obs in sequences:
            # on calcul les gammas et xis pour chaque séquence d'observation (gamma et xi sont suffisant)
            T = len(obs)
            obs = array(obs)
            alpha = calc_alpha(pi_i, a_ij, mu_i, sigma2_i, obs, N)
            beta = calc_beta(a_ij, mu_i, sigma2_i, obs, N)
            p_o_lambda = sum(alpha*beta, axis=1)
            gamma.append(calc_gamma(alpha, beta, p_o_lambda, T, N))
            xi.append(calc_xi(alpha, beta, p_o_lambda, a_ij, mu_i, sigma2_i, obs, N))

        # nouvelles estimations
        pi_i = sum(g[0, :] for g in gamma) / len(sequences)
        
        a_ij = array([[
                    sum([sum(x[:, i, j]) for x in xi])
                    /
                    sum([sum(g[:-1, i]) for g in gamma])
                    for j in range(N)]
                    for i in range(N)])
        
        mu_i = [
                sum([sum(g[:, i]*obs[:]) for g, obs in zip(gamma, sequences)])
                /
                sum([sum(g[:, i]) for g in gamma])
                for i in range(N)
                ]
        
        sigma2_i = [
                    sum([sum(g[:, i] * (obs[:] - mu_i[i])**2) for g, obs in zip(gamma, sequences)])
                    /
                    sum([sum(g[:, i]) for g in gamma])
                    for i in range(N)
                    ]
    return pi_i, a_ij, mu_i, sigma2_i
In [9]:
new_pi, new_a_ij, mu_i, sigma2_i = iteration(1, [obs_1, obs_2, obs_3])
print('pi final: \n', new_pi)
print('a_ij final: \n', new_a_ij)
print('mu_i final: \n', mu_i)
print('sigma2_i final: \n', sigma2_i)
pi final: 
 [1. 0. 0.]
a_ij final: 
 [[0.69273927 0.30726073 0.        ]
 [0.         0.72812913 0.27187087]
 [0.         0.         1.        ]]
mu_i final: 
 [13.455435986636354, 16.089687061027, 15.214764757077765]
sigma2_i final: 
 [15.38294502843168, 7.880047381388694, 4.10504152612364]
In [10]:
new_pi, new_a_ij, mu_i, sigma2_i = iteration(5, [obs_1, obs_2, obs_3])
print('pi final: \n', new_pi)
print('a_ij final: \n', new_a_ij)
print('mu_i final: \n', mu_i)
print('sigma2_i final: \n', sigma2_i)
pi final: 
 [1. 0. 0.]
a_ij final: 
 [[0.5       0.5       0.       ]
 [0.        0.7132842 0.2867158]
 [0.        0.        1.       ]]
mu_i final: 
 [9.999999999995403, 18.04226836504268, 14.226153001400226]
sigma2_i final: 
 [0.6666666666651371, 0.7177620653677123, 1.2119881945778672]
In [12]:
les_mus = array([iteration(i, [obs_1, obs_2, obs_3])[2] for i in range(1,11)]).T
figure(figsize=(8,8))
for mu in les_mus:
    plot(range(1, 11), mu)
grid()
In [13]:
les_mus
Out[13]:
array([[13.45543599, 11.65173508, 10.36718925, 10.00141631, 10.        ,
        10.        , 10.        , 10.        , 10.        , 10.        ],
       [16.08968706, 17.01750179, 17.59994342, 18.00381508, 18.04226837,
        18.0095087 , 18.00050311, 18.00011392, 18.00010527, 18.00010508],
       [15.21476476, 15.16995742, 14.90488805, 14.53566001, 14.226153  ,
        14.0456477 , 14.00239899, 14.00055659, 14.00051571, 14.00051483]])
In [ ]: