Nathaniel Dake Blog

7. HMM's with Continuous Observations

At this point we are ready to look at the use application of Hidden Markov Model's to discrete observations. All that is meant by continuous observations is that what you observe is a number on a scale, rather than a symbol such as heads or tails, or words. This is an interesting topic in and of itself, because it allows us to think about:

  • What is continuous?
  • What is discrete?
  • What are we approximating as discrete that is really continuous?

Think of an audio signal for a moment. Audio is simply sound, and sound is just air pressure that vibrates. By vibrating I just mean that it is oscillating in time. So, when audio is stored on your computer, obviously your computer cannot store an infinite number of values, and hence the air pressure has to be quantized; it has to be given a number that it can represent. Now, there are actually two things to quantize:

  1. The amplitude of the sound pressure $\rightarrow$ 16 bit int.
  2. The time the amplitude occurred $\rightarrow$ 44100 Hz.

As has been mentioned before in other posts, sound is generally sampled at 44.1 KHz. That is far too fast for your brain to realize it is not continuous. So, what happens when we use a hidden markov model, but we assume the observed variable is a mixture of gaussians? Well, in essence:

The amplitude becomes continuous, and the time stays discrete.

Before we dive into the details of integrating HMM's with GMM's, I want to note that the following is assuming you are familiar with GMM's (if not please review my posts on unsupervised learning, they give a very thorough and intuitive overview of GMM's).

In [1]:
import numpy as np
from scipy.stats import bernoulli, binom, norm
import seaborn as sns
import matplotlib.pyplot as plt

from IPython.core.display import HTML
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

sns.set(style="white", palette="husl")
sns.set_context("talk")
sns.set_style("ticks")

1. GMM's and HMM's

Now, the question is: How can we relate the concepts of GMM's back to what we already know about HMM's (specifically, to allow our HMM to deal with continuous data)? Remember that a Hidden Markov Model is defined by three things: $\pi$, $A$, and $B$. Because we are now working with continuous emissions, it seems that $B$ is what needs to change, since it used to be an $MxV$ matrix, and there are no longer just $V$ possible symbols.

Take a moment to think about what $\pi$ and $A$ really represent. They both are entirely related to what hidden state, $M$, we can expect to be in at a given time. Once we are in a specific hidden state, we know that $B$ determines the probability of observing a specific emission. This actually relates to gaussian mixture models quite nicely! Recall that GMM's also have a their own latent variable (the specific gaussian that a data point was generated via, aka the cluster it belongs to). In other words, our HMM has the concept of being in a specific hidden state, $Z$, and then the GMM has the concept of being in a particular hidden gaussian. In combination these can be used to determine the probability of observation. A visualization should help make this more concrete:

Above, we can see that we still have our transition matrix $A$, that determines how we move between hidden states. However, we now see these hidden states, $Z$, are labeled as hidden state 1. This is because we have introduced a second hidden state, that of the gaussian that is generating the observation. So, we transition into a new hidden state $Z$, via our transition matrix $A$, and then we select a specific gaussian, for our second hidden state. From the gaussian we sample in order to generate our observed data point $x$. We can see below that our $B$ emission matrix is replaced with the GMM:

A sample path that would generate two $x$ observations is seen below:

So we have an idea of how this works in practice, let's now go through the mathematics required to utilize GMM's in our HMM. Recall, that there are three things that we need for a Gaussian Mixture Model:

  • The responsibilities, the probability of a specific gaussian: $R$
  • For each individual gaussian, the mean: $\mu$
  • For each individual gaussian, the covariance: $\Sigma$

So, we are replacing $B$ by three new parameters: $R$, $\mu$ and $\Sigma$; this is seen clearly in the visualization above. We will use the letter $K$ to be the number of second hidden states.

It is very important to keep in mind that we have two hidden states (latent variables); in standard GMM's, we view the hidden state to be one of $K$ gaussian's. Now, that changes slightly in our HMM/GMM model. To see why, let's look at how we will store our responsibilities. $R$ will be stored as an $MxK$ matrix, so there is one row for each hidden state, $Z$, and then for each state there are $K$ different probabilities. Since this is a probability matrix, each row must sum to one.

R = (M x K)

In english, an entry in this matrix can be thought of as the probability that you are in a specific hidden state, $Z$, and you then have an observation from a specific second hidden state, $K$.

Put another way, what we are really saying here is that based on the specific hidden state, $Z$, that we are in, there is a corresponding prior distribution to select one of the $K$ latent states, which will then generate our data point $x$. As a simple example, let's say that we are looking at household energy usage. The first hidden state, $Z$, is the time of day (morning, afternoon, evening), the second hidden state (for which we have a gaussian distribution) is what appliance is being utilized (washer, blow dryer, blender, etc), and the observed $x$ is the energy usage in watts. If we know that the hidden state, time, is morning, then the probability that the blow dryer or blender are being use is higher than evening (a person is more likely to blow dry their hair in the morning, and also more likely to make a smoothie for breakfast). So, the probability of being in a specific second hidden state is most certainly based on the first hidden state that we are in.

If you did in fact go through my post on GMM's, you may recall that we defined the probability of an observation $x$ as:

$$P(x) = \pi_1 N(x \mid \mu_1, \Sigma_1) + \pi_2 N(x \mid \mu_2, \Sigma_2) + \pi_3 N(x \mid \mu_3, \Sigma_3)$$

Here the probability of ending up in a specific latent state (gaussian) was encapsulated entirely by $\pi$ (not the initial state probability that we deal with in HMM's). In our current scenario, the probability of ending up in a specific gaussian is based on our responsibilities, $R$ (which $\pi$ above heavily related to-see GMM post), as well our state transition matrix $A$, and our initial state probabilities, $\pi$ (or HMM $\pi$).

Moving on the gaussians themselves, Each individual $\mu$ is $D$ dimensional, and there are $K$ of them. We need one for each of the $M$ states; hence, $\mu$ will be an $MxKxD$ matrix:

mu = (M x K x D)

$\Sigma$ will be $MxKxDxD$:

sigma = (M x K x D x D)

Now, if you did in fact go through my post on GMM's, the above may seem slightly confusing/incorrect. In a standard GMM, we have our latent variable, of which there are $K$ possibilities. For each $K$ there is a specific corresponding gaussian, for which the means are held in a K x D matrix. Why do we suddenly have $M$ different gaussians for each of the $K$ latent states? The easiest way to explain this is via an extension of our energy example.

In the energy example, the second latent variable was appliance, of which there are $K$. The question is, based on the first hidden state (time of day), will the distribution of the appliance usage change? The answer is most certainly yes! You can imagine that the mean value of a blowdryer usage in the morning may be 20 watts, while in the evening it may have a mean value of 5 watts. Regardless of the actual numbers, based on the time of day our distribution is going to change. In order to account for this, each latent variable, $k$, must have a distribution at each particular time (first hidden state, $M$). The simplest way to achieve this goal is to hold our means as mu = (M x K x D) and our covariances as: sigma = (M x K x D x D).

Create $B$ observation matrix

Once a state ($j$) has been chosen ($\pi$ and $A$ have done their job), we create a "$B$" observation probability matrix of size $MxT$:

$$B(j,t) = \sum_{k=1}^K \overbrace{R(j,k) N\Big( x(t), \mu(j,k), \Sigma(j,k)\Big)}^\text{Gaussian Mixture Model}$$

Even though we don't technically have a $B$ matrix anymore, we can still make one by calculating it for each sequence. And we will store the individual mixture components as well:

$$Comp(j, k, t) = \overbrace{R(j,k) N\Big( x(t), \mu(j,k), \Sigma(j,k)\Big)} ^\text{Individual Mixture Component}$$

So, based on a the current hidden state, we transition/select one of multiple gaussians. The $comp$ is one of these gaussians. $B(j,t)$ is their linear combination (the definition of a gaussian mixture model).

I want to add a bit more detail to the above procedure. Remember that what we start with is a set of $X = X_1,...,X_T$ observations. What we are saying is that each individual sequence will have it's own unique emissions matrix $B$ based on it's unique set of observations. For instance, with standard HMM's if you wanted the probability of a specific observation given a hidden state you would find the specific entry in the $B$ matrix (think back to the coin toss example). Now, it is essentially the same thing! The $B$ matrix will be created based on the unique observations in $x$, and the probability is based on the sum of the mixture components.

Visually we this can be represented by the image below:

Based on a given hidden state and time, we must account for all of the ways that we can observed $x_1$; in this case the red, blue, and green paths (each being an entry in $comp$). We then sum up all of those possibilties and record it in our $B$ matrix as $B(j=1, t=1)$, for sequence one.

Expectation Step

With our $B$ matrix defined, we can move onto our expectation step. First, we can calculate a new $\gamma$, where $\gamma$ is the same variable defined in my prior HMM post:

$$\gamma(j,k,t) = \frac{\alpha(t, j) \beta(t, j)}{\sum_{j'=1}^M \alpha(t, j') \beta(t, j')} \frac{R(j,k) N\Big( x(t), \mu(j,k), \Sigma(j,k)\Big)}{\sum_{k'=1}^K R(j,k') N\Big( x(t), \mu(j,k'), \Sigma(j,k')\Big)} $$$$\gamma(j,k,t) = \frac{\alpha(t, j) \beta(t, j)}{\sum_{j'=1}^M \alpha(t, j') \beta(t, j')} \frac{Comp(j, k, t)}{B(j,t)} $$

Maximization Step

Moving on to our maximization step, we can now define the updates for $R$, $\mu$ and $\Sigma$:

$$R(j, k) = \frac{\sum_{t=1}^T \gamma(j, k, t)}{\sum_{t=1}^T \sum_{k'=1}^K \gamma(j, k', t)}$$$$\mu(j, k) = \frac{\sum_{t=1}^T \gamma(j,k,t)x(t)}{\sum_{t=1}^T \gamma(j,k,t)}$$$$\Sigma(j,k) = \frac{\sum_{t=1}^T \gamma(j, k, t)\big(x(t) - \mu(j,k)\big) \big(x(t) - \mu(j, k) \big)^T}{\sum_{t=1}^T \gamma(j, k, t)}$$

1.1 Continuous HMM's vs Regular HMM's $\rightarrow$ Comparison

I want to take a quick moment to compare the continuous HMM update rules above with those for the regular HMM from past posts. The update rule for $\gamma$ for the regular HMM looked like:

Original HMM
$$\phi(t, i ,j) = \frac{\alpha(t,i)A(i,j)B\big(j, x(t+1)\big)\beta(t+1, j)}{\sum_{i=1}^M \sum_{j=1}^M \alpha(t,i)A(i,j)B\big(j, x(t+1)\big)\beta(t+1, j)}$$

$$\gamma(t,i) = \sum_{j=1}^M \phi(t,i,j)$$

And, to reinforce the new update rule for the continuous HMM:

Continuous HMM
$$\gamma(j,k,t) = \frac{\alpha(t, j) \beta(t, j)}{\sum_{j'=1}^M \alpha(t, j') \beta(t, j')} \frac{R(j,k) N\Big( x(t), \mu(j,k), \Sigma(j,k)\Big)}{\sum_{k'=1}^K R(j,k') N\Big( x(t), \mu(j,k'), \Sigma(j,k')\Big)} $$

$$\gamma(j,k,t) = \frac{\alpha(t, j) \beta(t, j)}{\sum_{j'=1}^M \alpha(t, j') \beta(t, j')} \frac{Comp(j, k, t)}{B(j,t)} $$

The key differences are as follows:

  1. In the original HMM update the $A$ transition matrix was utilized. In the updated case, it is no longer present; rather the indexing has changed for $\alpha$ and $\beta$ leaving $A$ accounted for.
  2. The original $B$ matrix is replaced by the gaussian mixture.

1.2 Continuous HMM's vs GMM's $\rightarrow$ Comparison

Now let's quickly contrast vanilla GMM's with continuous HMM's. With GMM's, the goal was to learn the responsibilities, and means/covariances of each $k$ gaussian. The idea remains the same when dealing with continuous HMM's, however, we now must account for the fact that we are dealing with sequences. Hence, we must also account for the hidden state, $M$, and the transition matrix $A$.

Original GMM
So, our original GMM looked like:

  1. Find responsibilities
  2. Recalculate parameters, $\pi$, $\mu$, $\Sigma$

Continuous HMM
Meanwhile, our continuous HMM functions as follows:

  1. For every sequence (expectation step):

    • Iterate over time, state, gaussian, find component probability
    • Sum component probability to find $B$
    • Find forward and backward variables, $\alpha$ and $\beta$
    • Find $\gamma$
  2. For every sequence (maximization step):

    • Update $A$
    • Update mixture components ($R$, $\mu$, $\Sigma$)

2. Generating Continuous HMM Data

I want to now take a moment to actually dissect the data generation process of continuous HMM's, since I feel that it will provide clarity to their internal mechanics. Recall that with the discrete HMM, such as the magician's coin toss or a sentence, we had observation sequences:

coin toss -> x = [heads, heads, tails]

sentence -> x = "The red fox ran away"

Now, as we transition to continuous HMM's how does this change; what will our continuous observation sequences look like? Well, time will remain discrete, but the actual value at a unique time step will be continuous. For instance, $X$ may look like:

hmm continuous -> x = [5.1234324, 9.4452562, 1.875468, ... , 7.246524]

Where each index of $x$ represents a specific discrete time step, but each value at a given index is a continuous variable. How do we actually generate this data? Well, let's say that we have 5 hidden states, 3 latent variables (represented via gaussians), and 2 dimensions for $X$. The key pieces to our continuous HMM model in this case are as follows; we still have:

  • $\pi \rightarrow$ Initial hidden state probability
  • $A \rightarrow$ State transition matrix

And our $B$ emission matrix is replaced by:

  • $R \rightarrow$ Mixture responsibilities
  • $\mu \rightarrow$ Means of gaussians at each hidden state
  • $\Sigma \rightarrow$ Covariance of gaussians at each hidden state

The visualization of a sample path from earlier this post should give nice intuition for the process of generating the data. In pseudocode it will look like:

for sequence in all_sequences:
    for t in len(sequence):
        # Find next hidden state, sample from A
        # Find next gaussians based on current hidden state, sample from R
        # Calculate probability of x[t], randomly sample from selected gaussian

A full implementation can be seen below:

In [16]:
def simple_init():
    # 1 state, 1 gaussian, 1 dimension
    M = 1
    K = 1
    D = 1

    pi = np.array([1])
    A = np.array([[1]])
    R = np.array([[1]])
    mu = np.array([[[0]]])
    sigma = np.array([[[[1]]]])

    return M, K, D, pi, A, R, mu, sigma


def big_init():
    # 5 hidden states, 3 different gaussians, 2 dimensions
    M = 5
    K = 3
    D = 2

    # initial state distribution
    pi = np.array([1, 0, 0, 0, 0])

    # State transition matrix - likes to stay where it is (0.9 across diagonals)
    A = np.array([
        [0.9, 0.025, 0.025, 0.025, 0.025],
        [0.025, 0.9, 0.025, 0.025, 0.025],
        [0.025, 0.025, 0.9, 0.025, 0.025],
        [0.025, 0.025, 0.025, 0.9, 0.025],
        [0.025, 0.025, 0.025, 0.025, 0.9],
    ])

    # Mixture responsibilities -> Uniform distribution
    R = np.ones((M, K)) / K

    # Gaussian means --> M x K x D
    mu = np.array([
        [[0, 0], [1, 1], [2, 2]],
        [[5, 5], [6, 6], [7, 7]],
        [[10, 10], [11, 11], [12, 12]],
        [[15, 15], [16, 16], [17, 17]],
        [[20, 20], [21, 21], [22, 22]],
    ])

    # Gaussian Covariances
    sigma = np.zeros((M, K, D, D))

    for m in range(M):
        for k in range(K):
            sigma[m, k] = np.eye(D)

    return M, K, D, pi, A, R, mu, sigma


def get_signals(N=20, T=100, init=big_init):
    """
    Get signals from HMM with GMM

    Args:
        - N: number of sequences
        - T: length of sequence
        - init: which init to call

    Pseudocode:
        for n in number_of_sequences:
            for t in length_of_sequence:
                Find next hidden state (sample from A)
                Find next gaussian based on current hidden state (sample from R)
                Calculate probability of x (randomly sample from selected gaussian)
    """

    M, K, D, pi, A, R, mu, sigma = init()

    X = []

    # Loop through every N
    for n in range(N):
        x = np.zeros((T, D))
        s = 0 # initial state must be 0, based on pi definition
        r = np.random.choice(K, p=R[s])
        x[0] = np.random.multivariate_normal(mu[s][r], sigma[s][r])

        # Loop through all time steps after time t = 0
        for t in range(1, T):
            s = np.random.choice(M, p=A[s])  # sample from A[s] to select next state using A
            r = np.random.choice(K, p=R[s])  # sample from R[s] to select next gaussian (choose mixture)
            x[t] = np.random.multivariate_normal(mu[s][r], sigma[s][r])  #  generate data point

        X.append(x)
    return X


if __name__ == "__main__":
    T = 500
    x = get_signals(1, T)[0]
    axis = range(T)
    
    fig = plt.figure(figsize=(12,5))
    plt.plot(axis, x[:, 0], c=sns.xkcd_rgb["red"], lw=1.5, alpha=0.5)
    plt.plot(axis, x[:, 1], c=sns.xkcd_rgb["blue"], lw=1.5, alpha=0.5)
    plt.xlabel("Time step")
    plt.ylabel("Continuous Observation")
    plt.legend(["Dimension 1", "Dimension 2"], fontsize=10)
    plt.show()

Here we can see our continuous data plotted above; we have 2 unique dimensions (red and blue) each plotted against time. Below, we can see how our observation point $x$ moves through 2 dimensional space over time:

In [54]:
from matplotlib import rc, animation

fig = plt.figure(figsize=(8, 6), dpi=150)       
ax1 = fig.add_subplot(1, 1, 1)

marker1, = ax1.plot(15, 15, 'og')

def animate(i):
    ax1.clear()
    ax1.set_xlim(-5, 25)                               
    ax1.set_ylim(-5, 25) 
    marker1 = ax1.plot(x[i, 0], x[i, 1], 'og')
    ax1.annotate('T = {}'.format(i), [-1,22], size=20)
    ax1.set_xlabel("Dimension 1")
    ax1.set_ylabel("Dimension 2")
    
def init():
    ax1.set_xlim(-5, 25)                               
    ax1.set_ylim(-5, 25) 

plt.tight_layout()
plt.subplots_adjust(wspace=0.3, bottom=0.15, top=0.9)

# For rendering html video in cell
# html_video = HTML(
#     animation.FuncAnimation(
#         fig,
#         animate,
#         frames=500,
#         interval=200,
#         init_func=init,
#     ).to_html5_video()
# )
# display(html_video)
# gif_video = animation.FuncAnimation(
#         fig,
#         animate,
#         frames=500,
#         interval=200,
#         init_func=init,
#     )

# gif_video.save('hmm_continuous.gif', writer='imagemagick')

plt.close()

3. Continuous HMM's in Code

Below is HMM implemented in code.

In [22]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal as mvn

from hmm.generate_hmm_continuous import get_signals, big_init, simple_init
from hmm.utils import random_normalized


class HMM:
    def __init__(self, M, K):
        """
        Continuous HMM class. Note that there are 2 hidden states now, M and K.
        M is the hidden state that was used in regular HMMs. K is a new hidden
        state that is represented by a gaussian distribution.

        :param M: Number of hidden states
        :param K: Number of 2nd hidden states
        """
        self.M = M
        self.K = K

    def fit(self, X, max_iter=30, eps=10e-1):
        N = len(X)
        D = X[0].shape[1]

        self.pi = np.ones(self.M) / self.M # Uniform distribution
        self.A = random_normalized(self.M, self.M)

        # GMM parameters. mu is set similar to how it is set in kmeans -> 
        # randomly select points from dataset
        # R, responsibilities --> Uniform distribution
        self.R = np.ones((self.M, self.K)) / self.K
        self.mu = np.zeros((self.M, self.K, D))
        for i in range(self.M):
            for k in range(self.K):
                # For all states and all gaussians, get a random index, choose a random 
                # sequence, get a random time, and set mu[i,k] to be whatever point
                # was at that time
                random_idx = np.random.choice(N)
                x = X[random_idx]
                random_time_idx = np.random.choice(len(x))
                self.mu[i,k] = x[random_time_idx]

        self.sigma = np.zeros((self.M, self.K, D, D))
        for j in range(self.M):
            for k in range(self.K):
                self.sigma[j,k] = np.eye(D)

        costs = []
        for it in range(max_iter):

            alphas = []
            betas = []
            gammas = []
            Bs = []
            P = np.zeros(N) # Sequence probabilities

            # ----------- Expectation Step -----------
            # Iterate over every sequence
            for n in range(N):
                x = X[n]
                T = len(x)

                B = np.zeros((self.M, T))
                component = np.zeros((self.M, self.K, T))

                # Iterate over every state, every time, and every gaussian
                for j in range(self.M):
                    for t in range(T):
                        for k in range(self.K):
                            # Component probability
                            p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k]) 
                            component[j,k,t] = p
                            B[j,t] += p
                Bs.append(B)

                # Just like discrete case
                alpha = np.zeros((T, self.M))
                alpha[0] = self.pi * B[:, 0]
                for t in range(1, T):
                    alpha[t] = alpha[t-1].dot(self.A) * B[:,t]
                P[n] = alpha[-1].sum()
                assert(P[n] <= 1)
                alphas.append(alpha)

                beta = np.zeros((T, self.M))
                beta[-1] = 1
                for t in range(T-2, -1, -1):
                    beta[t] = self.A.dot(B[:, t+1] * beta[t+1])
                betas.append(beta)

                # This was not needed in the discrete case
                gamma = np.zeros((T, self.M, self.K))
                for t in range(T):
                    # Denominator only depends on t
                    alphabeta = (alphas[n][t,:] * betas[n][t,:]).sum()
                    for j in range(self.M):
                        # Now loop through every state and calculate alpha beta factor
                        factor = alphas[n][t,j] * betas[n][t,j] / alphabeta
                        for k in range(self.K):
                            # loop through all gaussians
                            gamma[t,j,k] = factor * component[j,k,t] / B[j,t]
                gammas.append(gamma)

            cost = np.log(P).sum()
            costs.append(cost)

            # ----------- Maximization Step -----------
            self.pi = np.sum((alphas[n][0] * betas[n][0]) / P[n] for n in range(N)) / N

            # Define numerators and denominators, since all updates formulas involve division
            a_den = np.zeros((self.M, 1))
            a_num = 0
            r_num = np.zeros((self.M, self.K))
            r_den = np.zeros(self.M)
            mu_num = np.zeros((self.M, self.K, D))
            sigma_num = np.zeros((self.M, self.K, D, D))
            # Note the denominator for mu and sigma is just r_num

            for n in range(N):
                # iterate over all sequences
                x = X[n]
                T = len(x)
                B = Bs[n]
                gamma = gammas[n]

                a_den += (alphas[n][:-1] * betas[n][:-1]).sum(axis=0, keepdims=True).T / P[n]

                # Update A -> This is the same update that was performed in the discrete case!
                a_num_n = np.zeros((self.M, self.M))
                for i in range(self.M):
                    for j in range(self.M):
                        for t in range(T-1):
                            a_num_n[i,j] += (alphas[n][t,i] * self.A[i,j] * 
                                             B[j,t+1] * betas[n][t+1,j])
                a_num += a_num_n / P[n]

                # Update mixture components
                r_num_n = np.zeros((self.M, self.K))
                r_den_n = np.zeros(self.M)
                for j in range(self.M):
                    for k in range(self.K):
                        for t in range(T):
                            r_num_n[j,k] += gamma[t,j,k]
                            r_den_n[j] += gamma[t,j,k]
                r_num += r_num_n / P[n]
                r_den += r_den_n / P[n]

                mu_num_n = np.zeros((self.M, self.K, D))
                sigma_num_n = np.zeros((self.M, self.K, D, D))
                for j in range(self.M):
                    for k in range(self.K):
                        for t in range(T):
                            mu_num_n[j,k] += gamma[t, j, k] * x[t]
                            sigma_num_n[j,k] += (
                                gamma[t,j,k]*np.outer(x[t] -self.mu[j,k], x[t] - self.mu[j,k])
                            )
                mu_num += mu_num_n / P[n]
                sigma_num += sigma_num_n / P[n]

            # Updates
            self.A = a_num / a_den
            for j in range(self.M):
                for k in range(self.K):
                    self.R[j,k] = r_num[j,k] / r_den[j]
                    self.mu[j,k] = mu_num[j,k] / r_num[j,k]
                    self.sigma[j,k] = sigma_num[j,k] / r_num[j,k]

        print("A:\n", self.A)
        print("mu:\n", self.mu)
        print("sigma:\n", self.sigma)
        print("R:\n", self.R)
        print("pi:\n", self.pi)

        plt.figure(figsize=(8,5))
        plt.plot(costs, color="blue")
        plt.xlabel("Iteration Number")
        plt.ylabel("Cost")
        plt.show()

    def likelihood(self, x):
        # returns log P(x | model)
        # using the forward part of the forward-backward algorithm
        T = len(x)
        alpha = np.zeros((T, self.M))

        B = np.zeros((self.M, T))
        for j in range(self.M):
            for t in range(T):
                for k in range(self.K):
                    p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k])
                    B[j,t] += p

        alpha[0] = self.pi*B[:,0]
        for t in range(1, T):
            alpha[t] = alpha[t-1].dot(self.A) * B[:,t]
        return alpha[-1].sum()

    def likelihood_multi(self, X):
        return np.array([self.likelihood(x) for x in X])

    def log_likelihood_multi(self, X):
        return np.log(self.likelihood_multi(X))

    def set(self, pi, A, R, mu, sigma):
        self.pi = pi
        self.A = A
        self.R = R
        self.mu = mu
        self.sigma = sigma
        M, K = R.shape
        self.M = M
        self.K = K

    def get_state_sequence(self, x):
        # returns the most likely state sequence given observed sequence x
        # using the Viterbi algorithm
        T = len(x)

        # make the emission matrix B
        B = np.zeros((self.M, T))
        for j in range(self.M):
            for t in range(T):
                for k in range(self.K):
                    p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k])
                    B[j,t] += p

        # perform Viterbi as usual
        delta = np.zeros((T, self.M))
        psi = np.zeros((T, self.M))
        delta[0] = self.pi*B[:,0]
        for t in range(1, T):
            for j in range(self.M):
                delta[t,j] = np.max(delta[t-1]*self.A[:,j]) * B[j,t]
                psi[t,j] = np.argmax(delta[t-1]*self.A[:,j])

        # backtrack
        states = np.zeros(T, dtype=np.int32)
        states[T-1] = np.argmax(delta[T-1])
        for t in range(T-2, -1, -1):
            states[t] = psi[t+1, states[t+1]]
        return states

def fake_signal(init=simple_init):
    signals = get_signals(N=10, T=10, init=init)

    hmm = HMM(2, 2)
    hmm.fit(signals)
    L = hmm.log_likelihood_multi(signals).sum()
    print("LL for fitted params:", L)
    

if __name__ == '__main__':
    fake_signal(init=simple_init)
A:
 [[0.64628237 0.35371763]
 [0.09076443 0.90923557]]
mu:
 [[[-1.55208313]
  [ 0.78623125]]

 [[ 0.66671194]
  [-0.74887296]]]
sigma:
 [[[[0.14990438]]

  [[0.09902498]]]


 [[[0.57063062]]

  [[0.40497775]]]]
R:
 [[0.59921946 0.40078054]
 [0.62778379 0.37221621]]
pi:
 [0.03075221 0.96924779]
LL for fitted params: -137.6845611905689

4. HMM Continuous with Theano

And below is an implementation with Theano. Keep in mind that that vast majority is similar to the discrete HMM implementation with Theano; we still have a recurrence block that is used to calculate the probability sequence/cost and we still have the update block that is used to update our HMM variables. The difference is that our update block now updates $R$, $\mu$ and $\Sigma$.

Additionally, we also have a portion prior to the recurrence block where we must calculate our emissions matrix $B$. This is done for each individual sequence, and $B$ is then used in finding the probability of the specific sequence (cost).

In [5]:
import wave
import theano
import theano.tensor as T
import numpy as np
import matplotlib.pyplot as plt

from Machine_Learning.hmm.utils import random_normalized


class HMM:
    def __init__(self, M, K):
        self.M = M
        self.K = K

    def fit(self, X, learning_rate=10e-3, max_iter=10):
        """Train continuous HMM model using stochastic gradient descent."""

        N = len(X) # Number of sequences
        D = X[0].shape[1] # Dimensionality

        # ---------- 1. Initialize continuous HMM model parameters ----------
        # Original HMM parameters
        pi0 = np.ones(self.M) / self.M # Uniform distribution
        A0 = random_normalized(self.M, self.M)

        # Continuous HMM (GMM) parameters.
        R0 = np.ones((self.M, self.K)) / self.K # Uniform distribution

        # mu is set similar to how it is set in kmeans -> randomly select points from dataset
        mu0 = np.zeros((self.M, self.K, D))
        for i in range(self.M):
            for k in range(self.K):
                # For all states and all gaussians, get a random index, choose a random sequence,
                # get a random time, and set mu[i,k] to be whatever point was at that time
                random_idx = np.random.choice(N)
                x = X[random_idx]
                random_time_idx = np.random.choice(len(x))
                mu0[i,k] = x[random_time_idx]

        # setting sigma so that all gaussians are initialized as spherical
        sigma0 = np.zeros((self.M, self.K, D, D))
        for j in range(self.M):
            for k in range(self.K):
                sigma0[j,k] = np.eye(D)

        thx, cost = self.set(pi0, A0, R0, mu0, sigma0)

        # ---------- 2. Perform updates on continuous HMM Model parameters via stochastic gradient descent ----------
        # This is why theano is so powerful. By linking all variables together via a computational
        # graph, we can define our cost, which is really just the probability of a sequence,
        # and we can then find the gradient of the cost with respect to our parameters, pi, A, B
        # R, mu, and sigma.
        pi_update = self.pi - learning_rate*T.grad(cost, self.pi)
        pi_update = pi_update / pi_update.sum()  # Normalizing to ensure it stays a probability

        A_update = self.A - learning_rate*T.grad(cost, self.A)
        A_update = A_update / A_update.sum(axis=1).dimshuffle(0, 'x')

        R_update = self.R - learning_rate*T.grad(cost, self.R)
        R_update = R_update / R_update.sum(axis=1).dimshuffle(0, 'x')

        updates = [
            (self.pi, pi_update),
            (self.A, A_update),
            (self.R, R_update),
            (self.mu, self.mu - learning_rate*T.grad(cost, self.mu)),
            (self.sigma, self.sigma - learning_rate*T.grad(cost, self.sigma)),
        ]

        train_op = theano.function(
            inputs=[thx],
            updates=updates,
        )

        costs = []
        for it in range(max_iter):
            print("it:", it)

            for n in range(N):
                c = self.log_likelihood_multi(X).sum()
                print("c:", c)
                costs.append(c)
                train_op(X[n])

        print("A:", self.A.get_value())
        print("mu:", self.mu.get_value())
        print("sigma:", self.sigma.get_value())
        print("R:", self.R.get_value())
        print("pi:", self.pi.get_value())

        plt.plot(costs)
        plt.show()

    def set(self, pi, A, R, mu, sigma):
        self.pi = theano.shared(pi)
        self.A = theano.shared(A)
        self.R = theano.shared(R)
        self.mu = theano.shared(mu)
        self.sigma = theano.shared(sigma)
        M, K = R.shape
        self.M = M
        self.K = K

        D = self.mu.shape[2]
        twopiD = (2*np.pi)**D # Needed to calculate gaussian

        thx = T.matrix("X") # Represents T x D matrix of sequential observations

        # ---------- Calculate B emission matrix ----------
        # We need to find B for a particular sequence.
        # Recall that B is an M x T matrix
        # For each hidden state, M, at each tim, there is a probability of observing whatever
        # value was in our sequence
        # We have the following below:
        # - component_pdf -> finds a single value of B, at given hidden state and time
        # - state_pdfs ----> finds a column in B; for all hidden states, and a single time
        # - gmm_pdf -------> finds a full B matrix, M x T. Loops over all time steps and
        #                    determines each column of B, resulting in a final B matrix

        def mvn_pdf(x, mu, sigma):
            """Need to define our own mvn because it does not exist in theano."""
            k  = 1 / T.sqrt(twopiD * T.nlinalg.det(sigma))
            e = T.exp(-0.5*(x - mu).T.dot(T.nlinalg.matrix_inverse(sigma).dot(x-mu)))
            return k * e

        def gmm_pdf(x):
            def state_pdfs(xt):
                """Create the B observation matrix.

                Takes in x at a particular time t. Will perform a theano scan, iterating
                over all hidden states M, finding their respective B(j,t). """
                def component_pdf(j, xt):
                    """Takes in hidden state and x at a particular time t. This
                    function is going to calculate B(j, t). Note, this must be defined as a
                    function that so it can be passed into theano scan.

                    Args:
                        - j: current sequence in sequences pass in from scan below
                        - xt: non sequence argument passed in from scan below
                    """
                    Bj_t = 0
                    for k in range(self.K):
                        Bj_t += self.R[j, k] * mvn_pdf(xt, self.mu[j,k], self.sigma[j,k])
                    return Bj_t

                Bt, _ = theano.scan(
                    fn=component_pdf,
                    sequences=T.arange(self.M),
                    n_steps=self.M,
                    outputs_info=None,
                    non_sequences=[xt]
                )
                return Bt

            # Calculate full B matrix
            B, _ = theano.scan(
                fn=state_pdfs,
                sequences=x,
                n_steps=x.shape[0],
                outputs_info=None
            )

            return B.T

        B = gmm_pdf(thx)

        def recurrence_to_find_alpha(t, old_a, B):
            """Forward algorithm."""
            a = old_a.dot(self.A) * B[:, t]
            s = a.sum()
            return (a / s), s

        [alpha, scale], _ = theano.scan(
            fn=recurrence_to_find_alpha,
            sequences=T.arange(1, thx.shape[0]),
            outputs_info=[self.pi*B[:,0], None],
            n_steps=thx.shape[0] - 1,
            non_sequences=[B]
        )

        cost = -T.log(scale).sum()
        self.cost_op = theano.function(
            inputs=[thx],
            outputs=cost
        )
        return thx, cost

    def log_likelihood_multi(self, X):
        return np.array([self.cost_op(x) for x in X])


def real_signal():
    """
    Extracts raw audio from Wav file.

    Right click on the file and go to "get info", you will see that:
        - sampling rate = 16000 Hz
        - bits per sample = 16
        - The first is quanitization in time
        - The second is quantization in amplitude

    This is also done for images. 2^16 = 65536 is the number of different sound levels
    that we have.
    """
    spf = wave.open("helloworld.wav")

    signal = spf.readframes(-1)
    signal = np.fromstring(signal, "Int16")
    T = len(signal)
    signal = (signal - signal.mean()) / signal.std() # Normalize

    hmm = HMM(5, 3)

    # Signal needs to be of shape N x T(n) x D
    hmm.fit(signal.reshape(1, T, 1), learning_rate=10e-6, max_iter=20)


if __name__ == "__main__":
    real_signal()
it: 0
c: 27824.79083213155
it: 1
c: 27660.673833612313
it: 2
c: 27528.88987659902
it: 3
c: 27420.318300164225
it: 4
c: 27329.129198825565
it: 5
c: 27251.342886919523
it: 6
c: 27184.109671725775
it: 7
c: 27125.314911724774
it: 8
c: 27073.346644460322
it: 9
c: 27026.95078286172
it: 10
c: 26985.136369534233
it: 11
c: 26947.110922732496
it: 12
c: 26912.234700028483
it: 13
c: 26879.98735187515
it: 14
c: 26849.942999043513
it: 15
c: 26821.75123233511
it: 16
c: 26795.122398342668
it: 17
c: 26769.81606327324
it: 18
c: 26745.631880192486
it: 19
c: 26722.40230258052
A: [[0.22630072 0.18112922 0.19844639 0.20126953 0.19285414]
 [0.21022374 0.20710353 0.19521904 0.19870029 0.1887534 ]
 [0.22114463 0.19219237 0.20721744 0.18678923 0.19265634]
 [0.21889915 0.18118594 0.18559644 0.20789528 0.20642319]
 [0.20858331 0.17691975 0.18470723 0.21260964 0.21718007]]
mu: [[[-1.35614813e-02]
  [-5.68437598e-01]
  [ 1.12338485e-03]]

 [[-1.08992905e-02]
  [ 1.90512211e+00]
  [-1.04312503e+00]]

 [[-1.45410778e+00]
  [ 8.08365304e-01]
  [-1.62854413e-02]]

 [[-1.92239295e-01]
  [ 1.44981052e+00]
  [ 2.02866563e-01]]

 [[ 5.90041513e-01]
  [ 9.69933049e-01]
  [ 1.48403921e-04]]]
sigma: [[[[0.92937454]]

  [[0.97355304]]

  [[0.92922015]]]


 [[[0.91822568]]

  [[1.04845774]]

  [[1.0186508 ]]]


 [[[1.03280142]]

  [[0.98484553]]

  [[0.9223032 ]]]


 [[[0.9218524 ]]

  [[1.03078632]]

  [[0.92024199]]]


 [[[0.96336242]]

  [[0.9983649 ]]

  [[0.92801658]]]]
R: [[0.33933272 0.32149752 0.33916975]
 [0.4063751  0.24353848 0.35008641]
 [0.30208399 0.31723683 0.38067918]
 [0.37808764 0.24831756 0.3735948 ]
 [0.33510298 0.29588412 0.3690129 ]]
pi: [0.200041   0.19995993 0.1999805  0.20000982 0.20000874]
In [ ]:
 
In [ ]:
 

© 2018 Nathaniel Dake