At this point we are ready to look at the use application of Hidden Markov Model's to discrete observations. All that is meant by continuous observations is that what you observe is a number on a scale, rather than a symbol such as heads or tails, or words. This is an interesting topic in and of itself, because it allows us to think about:
Think of an audio signal for a moment. Audio is simply sound, and sound is just air pressure that vibrates. By vibrating I just mean that it is oscillating in time. So, when audio is stored on your computer, obviously your computer cannot store an infinite number of values, and hence the air pressure has to be quantized; it has to be given a number that it can represent. Now, there are actually two things to quantize:
As has been mentioned before in other posts, sound is generally sampled at 44.1 KHz. That is far too fast for your brain to realize it is not continuous. So, what happens when we use a hidden markov model, but we assume the observed variable is a mixture of gaussians? Well, in essence:
The amplitude becomes continuous, and the time stays discrete.
Before we dive into the details of integrating HMM's with GMM's, I want to note that the following is assuming you are familiar with GMM's (if not please review my posts on unsupervised learning, they give a very thorough and intuitive overview of GMM's).
import numpy as np
from scipy.stats import bernoulli, binom, norm
import seaborn as sns
import matplotlib.pyplot as plt
from IPython.core.display import HTML
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
sns.set(style="white", palette="husl")
sns.set_context("talk")
sns.set_style("ticks")
Now, the question is: How can we relate the concepts of GMM's back to what we already know about HMM's (specifically, to allow our HMM to deal with continuous data)? Remember that a Hidden Markov Model is defined by three things: $\pi$, $A$, and $B$. Because we are now working with continuous emissions, it seems that $B$ is what needs to change, since it used to be an $MxV$ matrix, and there are no longer just $V$ possible symbols.
Take a moment to think about what $\pi$ and $A$ really represent. They both are entirely related to what hidden state, $M$, we can expect to be in at a given time. Once we are in a specific hidden state, we know that $B$ determines the probability of observing a specific emission. This actually relates to gaussian mixture models quite nicely! Recall that GMM's also have a their own latent variable (the specific gaussian that a data point was generated via, aka the cluster it belongs to). In other words, our HMM has the concept of being in a specific hidden state, $Z$, and then the GMM has the concept of being in a particular hidden gaussian. In combination these can be used to determine the probability of observation. A visualization should help make this more concrete:
Above, we can see that we still have our transition matrix $A$, that determines how we move between hidden states. However, we now see these hidden states, $Z$, are labeled as hidden state 1. This is because we have introduced a second hidden state, that of the gaussian that is generating the observation. So, we transition into a new hidden state $Z$, via our transition matrix $A$, and then we select a specific gaussian, for our second hidden state. From the gaussian we sample in order to generate our observed data point $x$. We can see below that our $B$ emission matrix is replaced with the GMM:
A sample path that would generate two $x$ observations is seen below:
So we have an idea of how this works in practice, let's now go through the mathematics required to utilize GMM's in our HMM. Recall, that there are three things that we need for a Gaussian Mixture Model:
So, we are replacing $B$ by three new parameters: $R$, $\mu$ and $\Sigma$; this is seen clearly in the visualization above. We will use the letter $K$ to be the number of second hidden states.
It is very important to keep in mind that we have two hidden states (latent variables); in standard GMM's, we view the hidden state to be one of $K$ gaussian's. Now, that changes slightly in our HMM/GMM model. To see why, let's look at how we will store our responsibilities. $R$ will be stored as an $MxK$ matrix, so there is one row for each hidden state, $Z$, and then for each state there are $K$ different probabilities. Since this is a probability matrix, each row must sum to one.
R = (M x K)
In english, an entry in this matrix can be thought of as the probability that you are in a specific hidden state, $Z$, and you then have an observation from a specific second hidden state, $K$.
Put another way, what we are really saying here is that based on the specific hidden state, $Z$, that we are in, there is a corresponding prior distribution to select one of the $K$ latent states, which will then generate our data point $x$. As a simple example, let's say that we are looking at household energy usage. The first hidden state, $Z$, is the time of day (morning, afternoon, evening), the second hidden state (for which we have a gaussian distribution) is what appliance is being utilized (washer, blow dryer, blender, etc), and the observed $x$ is the energy usage in watts. If we know that the hidden state, time, is morning, then the probability that the blow dryer or blender are being use is higher than evening (a person is more likely to blow dry their hair in the morning, and also more likely to make a smoothie for breakfast). So, the probability of being in a specific second hidden state is most certainly based on the first hidden state that we are in.
If you did in fact go through my post on GMM's, you may recall that we defined the probability of an observation $x$ as:
$$P(x) = \pi_1 N(x \mid \mu_1, \Sigma_1) + \pi_2 N(x \mid \mu_2, \Sigma_2) + \pi_3 N(x \mid \mu_3, \Sigma_3)$$Here the probability of ending up in a specific latent state (gaussian) was encapsulated entirely by $\pi$ (not the initial state probability that we deal with in HMM's). In our current scenario, the probability of ending up in a specific gaussian is based on our responsibilities, $R$ (which $\pi$ above heavily related to-see GMM post), as well our state transition matrix $A$, and our initial state probabilities, $\pi$ (or HMM $\pi$).
Moving on the gaussians themselves, Each individual $\mu$ is $D$ dimensional, and there are $K$ of them. We need one for each of the $M$ states; hence, $\mu$ will be an $MxKxD$ matrix:
mu = (M x K x D)
$\Sigma$ will be $MxKxDxD$:
sigma = (M x K x D x D)
Now, if you did in fact go through my post on GMM's, the above may seem slightly confusing/incorrect. In a standard GMM, we have our latent variable, of which there are $K$ possibilities. For each $K$ there is a specific corresponding gaussian, for which the means are held in a K x D
matrix. Why do we suddenly have $M$ different gaussians for each of the $K$ latent states? The easiest way to explain this is via an extension of our energy example.
In the energy example, the second latent variable was appliance, of which there are $K$. The question is, based on the first hidden state (time of day), will the distribution of the appliance usage change? The answer is most certainly yes! You can imagine that the mean value of a blowdryer usage in the morning may be 20 watts, while in the evening it may have a mean value of 5 watts. Regardless of the actual numbers, based on the time of day our distribution is going to change. In order to account for this, each latent variable, $k$, must have a distribution at each particular time (first hidden state, $M$). The simplest way to achieve this goal is to hold our means as mu = (M x K x D)
and our covariances as: sigma = (M x K x D x D)
.
Once a state ($j$) has been chosen ($\pi$ and $A$ have done their job), we create a "$B$" observation probability matrix of size $MxT$:
$$B(j,t) = \sum_{k=1}^K \overbrace{R(j,k) N\Big( x(t), \mu(j,k), \Sigma(j,k)\Big)}^\text{Gaussian Mixture Model}$$Even though we don't technically have a $B$ matrix anymore, we can still make one by calculating it for each sequence. And we will store the individual mixture components as well:
$$Comp(j, k, t) = \overbrace{R(j,k) N\Big( x(t), \mu(j,k), \Sigma(j,k)\Big)} ^\text{Individual Mixture Component}$$So, based on a the current hidden state, we transition/select one of multiple gaussians. The $comp$ is one of these gaussians. $B(j,t)$ is their linear combination (the definition of a gaussian mixture model).
I want to add a bit more detail to the above procedure. Remember that what we start with is a set of $X = X_1,...,X_T$ observations. What we are saying is that each individual sequence will have it's own unique emissions matrix $B$ based on it's unique set of observations. For instance, with standard HMM's if you wanted the probability of a specific observation given a hidden state you would find the specific entry in the $B$ matrix (think back to the coin toss example). Now, it is essentially the same thing! The $B$ matrix will be created based on the unique observations in $x$, and the probability is based on the sum of the mixture components.
Visually we this can be represented by the image below:
Based on a given hidden state and time, we must account for all of the ways that we can observed $x_1$; in this case the red, blue, and green paths (each being an entry in $comp$). We then sum up all of those possibilties and record it in our $B$ matrix as $B(j=1, t=1)$, for sequence one.
With our $B$ matrix defined, we can move onto our expectation step. First, we can calculate a new $\gamma$, where $\gamma$ is the same variable defined in my prior HMM post:
$$\gamma(j,k,t) = \frac{\alpha(t, j) \beta(t, j)}{\sum_{j'=1}^M \alpha(t, j') \beta(t, j')} \frac{R(j,k) N\Big( x(t), \mu(j,k), \Sigma(j,k)\Big)}{\sum_{k'=1}^K R(j,k') N\Big( x(t), \mu(j,k'), \Sigma(j,k')\Big)} $$$$\gamma(j,k,t) = \frac{\alpha(t, j) \beta(t, j)}{\sum_{j'=1}^M \alpha(t, j') \beta(t, j')} \frac{Comp(j, k, t)}{B(j,t)} $$Moving on to our maximization step, we can now define the updates for $R$, $\mu$ and $\Sigma$:
$$R(j, k) = \frac{\sum_{t=1}^T \gamma(j, k, t)}{\sum_{t=1}^T \sum_{k'=1}^K \gamma(j, k', t)}$$$$\mu(j, k) = \frac{\sum_{t=1}^T \gamma(j,k,t)x(t)}{\sum_{t=1}^T \gamma(j,k,t)}$$$$\Sigma(j,k) = \frac{\sum_{t=1}^T \gamma(j, k, t)\big(x(t) - \mu(j,k)\big) \big(x(t) - \mu(j, k) \big)^T}{\sum_{t=1}^T \gamma(j, k, t)}$$I want to take a quick moment to compare the continuous HMM update rules above with those for the regular HMM from past posts. The update rule for $\gamma$ for the regular HMM looked like:
Original HMM
$$\phi(t, i ,j) = \frac{\alpha(t,i)A(i,j)B\big(j, x(t+1)\big)\beta(t+1, j)}{\sum_{i=1}^M \sum_{j=1}^M \alpha(t,i)A(i,j)B\big(j, x(t+1)\big)\beta(t+1, j)}$$
And, to reinforce the new update rule for the continuous HMM:
Continuous HMM
$$\gamma(j,k,t) = \frac{\alpha(t, j) \beta(t, j)}{\sum_{j'=1}^M \alpha(t, j') \beta(t, j')}
\frac{R(j,k) N\Big( x(t), \mu(j,k), \Sigma(j,k)\Big)}{\sum_{k'=1}^K R(j,k') N\Big( x(t), \mu(j,k'), \Sigma(j,k')\Big)}
$$
The key differences are as follows:
Now let's quickly contrast vanilla GMM's with continuous HMM's. With GMM's, the goal was to learn the responsibilities, and means/covariances of each $k$ gaussian. The idea remains the same when dealing with continuous HMM's, however, we now must account for the fact that we are dealing with sequences. Hence, we must also account for the hidden state, $M$, and the transition matrix $A$.
Original GMM
So, our original GMM looked like:
Continuous HMM
Meanwhile, our continuous HMM functions as follows:
For every sequence (expectation step):
For every sequence (maximization step):
I want to now take a moment to actually dissect the data generation process of continuous HMM's, since I feel that it will provide clarity to their internal mechanics. Recall that with the discrete HMM, such as the magician's coin toss or a sentence, we had observation sequences:
coin toss -> x = [heads, heads, tails]
sentence -> x = "The red fox ran away"
Now, as we transition to continuous HMM's how does this change; what will our continuous observation sequences look like? Well, time will remain discrete, but the actual value at a unique time step will be continuous. For instance, $X$ may look like:
hmm continuous -> x = [5.1234324, 9.4452562, 1.875468, ... , 7.246524]
Where each index of $x$ represents a specific discrete time step, but each value at a given index is a continuous variable. How do we actually generate this data? Well, let's say that we have 5 hidden states, 3 latent variables (represented via gaussians), and 2 dimensions for $X$. The key pieces to our continuous HMM model in this case are as follows; we still have:
And our $B$ emission matrix is replaced by:
The visualization of a sample path from earlier this post should give nice intuition for the process of generating the data. In pseudocode it will look like:
for sequence in all_sequences:
for t in len(sequence):
# Find next hidden state, sample from A
# Find next gaussians based on current hidden state, sample from R
# Calculate probability of x[t], randomly sample from selected gaussian
A full implementation can be seen below:
def simple_init():
# 1 state, 1 gaussian, 1 dimension
M = 1
K = 1
D = 1
pi = np.array([1])
A = np.array([[1]])
R = np.array([[1]])
mu = np.array([[[0]]])
sigma = np.array([[[[1]]]])
return M, K, D, pi, A, R, mu, sigma
def big_init():
# 5 hidden states, 3 different gaussians, 2 dimensions
M = 5
K = 3
D = 2
# initial state distribution
pi = np.array([1, 0, 0, 0, 0])
# State transition matrix - likes to stay where it is (0.9 across diagonals)
A = np.array([
[0.9, 0.025, 0.025, 0.025, 0.025],
[0.025, 0.9, 0.025, 0.025, 0.025],
[0.025, 0.025, 0.9, 0.025, 0.025],
[0.025, 0.025, 0.025, 0.9, 0.025],
[0.025, 0.025, 0.025, 0.025, 0.9],
])
# Mixture responsibilities -> Uniform distribution
R = np.ones((M, K)) / K
# Gaussian means --> M x K x D
mu = np.array([
[[0, 0], [1, 1], [2, 2]],
[[5, 5], [6, 6], [7, 7]],
[[10, 10], [11, 11], [12, 12]],
[[15, 15], [16, 16], [17, 17]],
[[20, 20], [21, 21], [22, 22]],
])
# Gaussian Covariances
sigma = np.zeros((M, K, D, D))
for m in range(M):
for k in range(K):
sigma[m, k] = np.eye(D)
return M, K, D, pi, A, R, mu, sigma
def get_signals(N=20, T=100, init=big_init):
"""
Get signals from HMM with GMM
Args:
- N: number of sequences
- T: length of sequence
- init: which init to call
Pseudocode:
for n in number_of_sequences:
for t in length_of_sequence:
Find next hidden state (sample from A)
Find next gaussian based on current hidden state (sample from R)
Calculate probability of x (randomly sample from selected gaussian)
"""
M, K, D, pi, A, R, mu, sigma = init()
X = []
# Loop through every N
for n in range(N):
x = np.zeros((T, D))
s = 0 # initial state must be 0, based on pi definition
r = np.random.choice(K, p=R[s])
x[0] = np.random.multivariate_normal(mu[s][r], sigma[s][r])
# Loop through all time steps after time t = 0
for t in range(1, T):
s = np.random.choice(M, p=A[s]) # sample from A[s] to select next state using A
r = np.random.choice(K, p=R[s]) # sample from R[s] to select next gaussian (choose mixture)
x[t] = np.random.multivariate_normal(mu[s][r], sigma[s][r]) # generate data point
X.append(x)
return X
if __name__ == "__main__":
T = 500
x = get_signals(1, T)[0]
axis = range(T)
fig = plt.figure(figsize=(12,5))
plt.plot(axis, x[:, 0], c=sns.xkcd_rgb["red"], lw=1.5, alpha=0.5)
plt.plot(axis, x[:, 1], c=sns.xkcd_rgb["blue"], lw=1.5, alpha=0.5)
plt.xlabel("Time step")
plt.ylabel("Continuous Observation")
plt.legend(["Dimension 1", "Dimension 2"], fontsize=10)
plt.show()
Here we can see our continuous data plotted above; we have 2 unique dimensions (red and blue) each plotted against time. Below, we can see how our observation point $x$ moves through 2 dimensional space over time:
from matplotlib import rc, animation
fig = plt.figure(figsize=(8, 6), dpi=150)
ax1 = fig.add_subplot(1, 1, 1)
marker1, = ax1.plot(15, 15, 'og')
def animate(i):
ax1.clear()
ax1.set_xlim(-5, 25)
ax1.set_ylim(-5, 25)
marker1 = ax1.plot(x[i, 0], x[i, 1], 'og')
ax1.annotate('T = {}'.format(i), [-1,22], size=20)
ax1.set_xlabel("Dimension 1")
ax1.set_ylabel("Dimension 2")
def init():
ax1.set_xlim(-5, 25)
ax1.set_ylim(-5, 25)
plt.tight_layout()
plt.subplots_adjust(wspace=0.3, bottom=0.15, top=0.9)
# For rendering html video in cell
# html_video = HTML(
# animation.FuncAnimation(
# fig,
# animate,
# frames=500,
# interval=200,
# init_func=init,
# ).to_html5_video()
# )
# display(html_video)
# gif_video = animation.FuncAnimation(
# fig,
# animate,
# frames=500,
# interval=200,
# init_func=init,
# )
# gif_video.save('hmm_continuous.gif', writer='imagemagick')
plt.close()
Below is HMM implemented in code.
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal as mvn
from hmm.generate_hmm_continuous import get_signals, big_init, simple_init
from hmm.utils import random_normalized
class HMM:
def __init__(self, M, K):
"""
Continuous HMM class. Note that there are 2 hidden states now, M and K.
M is the hidden state that was used in regular HMMs. K is a new hidden
state that is represented by a gaussian distribution.
:param M: Number of hidden states
:param K: Number of 2nd hidden states
"""
self.M = M
self.K = K
def fit(self, X, max_iter=30, eps=10e-1):
N = len(X)
D = X[0].shape[1]
self.pi = np.ones(self.M) / self.M # Uniform distribution
self.A = random_normalized(self.M, self.M)
# GMM parameters. mu is set similar to how it is set in kmeans ->
# randomly select points from dataset
# R, responsibilities --> Uniform distribution
self.R = np.ones((self.M, self.K)) / self.K
self.mu = np.zeros((self.M, self.K, D))
for i in range(self.M):
for k in range(self.K):
# For all states and all gaussians, get a random index, choose a random
# sequence, get a random time, and set mu[i,k] to be whatever point
# was at that time
random_idx = np.random.choice(N)
x = X[random_idx]
random_time_idx = np.random.choice(len(x))
self.mu[i,k] = x[random_time_idx]
self.sigma = np.zeros((self.M, self.K, D, D))
for j in range(self.M):
for k in range(self.K):
self.sigma[j,k] = np.eye(D)
costs = []
for it in range(max_iter):
alphas = []
betas = []
gammas = []
Bs = []
P = np.zeros(N) # Sequence probabilities
# ----------- Expectation Step -----------
# Iterate over every sequence
for n in range(N):
x = X[n]
T = len(x)
B = np.zeros((self.M, T))
component = np.zeros((self.M, self.K, T))
# Iterate over every state, every time, and every gaussian
for j in range(self.M):
for t in range(T):
for k in range(self.K):
# Component probability
p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k])
component[j,k,t] = p
B[j,t] += p
Bs.append(B)
# Just like discrete case
alpha = np.zeros((T, self.M))
alpha[0] = self.pi * B[:, 0]
for t in range(1, T):
alpha[t] = alpha[t-1].dot(self.A) * B[:,t]
P[n] = alpha[-1].sum()
assert(P[n] <= 1)
alphas.append(alpha)
beta = np.zeros((T, self.M))
beta[-1] = 1
for t in range(T-2, -1, -1):
beta[t] = self.A.dot(B[:, t+1] * beta[t+1])
betas.append(beta)
# This was not needed in the discrete case
gamma = np.zeros((T, self.M, self.K))
for t in range(T):
# Denominator only depends on t
alphabeta = (alphas[n][t,:] * betas[n][t,:]).sum()
for j in range(self.M):
# Now loop through every state and calculate alpha beta factor
factor = alphas[n][t,j] * betas[n][t,j] / alphabeta
for k in range(self.K):
# loop through all gaussians
gamma[t,j,k] = factor * component[j,k,t] / B[j,t]
gammas.append(gamma)
cost = np.log(P).sum()
costs.append(cost)
# ----------- Maximization Step -----------
self.pi = np.sum((alphas[n][0] * betas[n][0]) / P[n] for n in range(N)) / N
# Define numerators and denominators, since all updates formulas involve division
a_den = np.zeros((self.M, 1))
a_num = 0
r_num = np.zeros((self.M, self.K))
r_den = np.zeros(self.M)
mu_num = np.zeros((self.M, self.K, D))
sigma_num = np.zeros((self.M, self.K, D, D))
# Note the denominator for mu and sigma is just r_num
for n in range(N):
# iterate over all sequences
x = X[n]
T = len(x)
B = Bs[n]
gamma = gammas[n]
a_den += (alphas[n][:-1] * betas[n][:-1]).sum(axis=0, keepdims=True).T / P[n]
# Update A -> This is the same update that was performed in the discrete case!
a_num_n = np.zeros((self.M, self.M))
for i in range(self.M):
for j in range(self.M):
for t in range(T-1):
a_num_n[i,j] += (alphas[n][t,i] * self.A[i,j] *
B[j,t+1] * betas[n][t+1,j])
a_num += a_num_n / P[n]
# Update mixture components
r_num_n = np.zeros((self.M, self.K))
r_den_n = np.zeros(self.M)
for j in range(self.M):
for k in range(self.K):
for t in range(T):
r_num_n[j,k] += gamma[t,j,k]
r_den_n[j] += gamma[t,j,k]
r_num += r_num_n / P[n]
r_den += r_den_n / P[n]
mu_num_n = np.zeros((self.M, self.K, D))
sigma_num_n = np.zeros((self.M, self.K, D, D))
for j in range(self.M):
for k in range(self.K):
for t in range(T):
mu_num_n[j,k] += gamma[t, j, k] * x[t]
sigma_num_n[j,k] += (
gamma[t,j,k]*np.outer(x[t] -self.mu[j,k], x[t] - self.mu[j,k])
)
mu_num += mu_num_n / P[n]
sigma_num += sigma_num_n / P[n]
# Updates
self.A = a_num / a_den
for j in range(self.M):
for k in range(self.K):
self.R[j,k] = r_num[j,k] / r_den[j]
self.mu[j,k] = mu_num[j,k] / r_num[j,k]
self.sigma[j,k] = sigma_num[j,k] / r_num[j,k]
print("A:\n", self.A)
print("mu:\n", self.mu)
print("sigma:\n", self.sigma)
print("R:\n", self.R)
print("pi:\n", self.pi)
plt.figure(figsize=(8,5))
plt.plot(costs, color="blue")
plt.xlabel("Iteration Number")
plt.ylabel("Cost")
plt.show()
def likelihood(self, x):
# returns log P(x | model)
# using the forward part of the forward-backward algorithm
T = len(x)
alpha = np.zeros((T, self.M))
B = np.zeros((self.M, T))
for j in range(self.M):
for t in range(T):
for k in range(self.K):
p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k])
B[j,t] += p
alpha[0] = self.pi*B[:,0]
for t in range(1, T):
alpha[t] = alpha[t-1].dot(self.A) * B[:,t]
return alpha[-1].sum()
def likelihood_multi(self, X):
return np.array([self.likelihood(x) for x in X])
def log_likelihood_multi(self, X):
return np.log(self.likelihood_multi(X))
def set(self, pi, A, R, mu, sigma):
self.pi = pi
self.A = A
self.R = R
self.mu = mu
self.sigma = sigma
M, K = R.shape
self.M = M
self.K = K
def get_state_sequence(self, x):
# returns the most likely state sequence given observed sequence x
# using the Viterbi algorithm
T = len(x)
# make the emission matrix B
B = np.zeros((self.M, T))
for j in range(self.M):
for t in range(T):
for k in range(self.K):
p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k])
B[j,t] += p
# perform Viterbi as usual
delta = np.zeros((T, self.M))
psi = np.zeros((T, self.M))
delta[0] = self.pi*B[:,0]
for t in range(1, T):
for j in range(self.M):
delta[t,j] = np.max(delta[t-1]*self.A[:,j]) * B[j,t]
psi[t,j] = np.argmax(delta[t-1]*self.A[:,j])
# backtrack
states = np.zeros(T, dtype=np.int32)
states[T-1] = np.argmax(delta[T-1])
for t in range(T-2, -1, -1):
states[t] = psi[t+1, states[t+1]]
return states
def fake_signal(init=simple_init):
signals = get_signals(N=10, T=10, init=init)
hmm = HMM(2, 2)
hmm.fit(signals)
L = hmm.log_likelihood_multi(signals).sum()
print("LL for fitted params:", L)
if __name__ == '__main__':
fake_signal(init=simple_init)
And below is an implementation with Theano. Keep in mind that that vast majority is similar to the discrete HMM implementation with Theano; we still have a recurrence block that is used to calculate the probability sequence/cost and we still have the update block that is used to update our HMM variables. The difference is that our update block now updates $R$, $\mu$ and $\Sigma$.
Additionally, we also have a portion prior to the recurrence block where we must calculate our emissions matrix $B$. This is done for each individual sequence, and $B$ is then used in finding the probability of the specific sequence (cost).
import wave
import theano
import theano.tensor as T
import numpy as np
import matplotlib.pyplot as plt
from Machine_Learning.hmm.utils import random_normalized
class HMM:
def __init__(self, M, K):
self.M = M
self.K = K
def fit(self, X, learning_rate=10e-3, max_iter=10):
"""Train continuous HMM model using stochastic gradient descent."""
N = len(X) # Number of sequences
D = X[0].shape[1] # Dimensionality
# ---------- 1. Initialize continuous HMM model parameters ----------
# Original HMM parameters
pi0 = np.ones(self.M) / self.M # Uniform distribution
A0 = random_normalized(self.M, self.M)
# Continuous HMM (GMM) parameters.
R0 = np.ones((self.M, self.K)) / self.K # Uniform distribution
# mu is set similar to how it is set in kmeans -> randomly select points from dataset
mu0 = np.zeros((self.M, self.K, D))
for i in range(self.M):
for k in range(self.K):
# For all states and all gaussians, get a random index, choose a random sequence,
# get a random time, and set mu[i,k] to be whatever point was at that time
random_idx = np.random.choice(N)
x = X[random_idx]
random_time_idx = np.random.choice(len(x))
mu0[i,k] = x[random_time_idx]
# setting sigma so that all gaussians are initialized as spherical
sigma0 = np.zeros((self.M, self.K, D, D))
for j in range(self.M):
for k in range(self.K):
sigma0[j,k] = np.eye(D)
thx, cost = self.set(pi0, A0, R0, mu0, sigma0)
# ---------- 2. Perform updates on continuous HMM Model parameters via stochastic gradient descent ----------
# This is why theano is so powerful. By linking all variables together via a computational
# graph, we can define our cost, which is really just the probability of a sequence,
# and we can then find the gradient of the cost with respect to our parameters, pi, A, B
# R, mu, and sigma.
pi_update = self.pi - learning_rate*T.grad(cost, self.pi)
pi_update = pi_update / pi_update.sum() # Normalizing to ensure it stays a probability
A_update = self.A - learning_rate*T.grad(cost, self.A)
A_update = A_update / A_update.sum(axis=1).dimshuffle(0, 'x')
R_update = self.R - learning_rate*T.grad(cost, self.R)
R_update = R_update / R_update.sum(axis=1).dimshuffle(0, 'x')
updates = [
(self.pi, pi_update),
(self.A, A_update),
(self.R, R_update),
(self.mu, self.mu - learning_rate*T.grad(cost, self.mu)),
(self.sigma, self.sigma - learning_rate*T.grad(cost, self.sigma)),
]
train_op = theano.function(
inputs=[thx],
updates=updates,
)
costs = []
for it in range(max_iter):
print("it:", it)
for n in range(N):
c = self.log_likelihood_multi(X).sum()
print("c:", c)
costs.append(c)
train_op(X[n])
print("A:", self.A.get_value())
print("mu:", self.mu.get_value())
print("sigma:", self.sigma.get_value())
print("R:", self.R.get_value())
print("pi:", self.pi.get_value())
plt.plot(costs)
plt.show()
def set(self, pi, A, R, mu, sigma):
self.pi = theano.shared(pi)
self.A = theano.shared(A)
self.R = theano.shared(R)
self.mu = theano.shared(mu)
self.sigma = theano.shared(sigma)
M, K = R.shape
self.M = M
self.K = K
D = self.mu.shape[2]
twopiD = (2*np.pi)**D # Needed to calculate gaussian
thx = T.matrix("X") # Represents T x D matrix of sequential observations
# ---------- Calculate B emission matrix ----------
# We need to find B for a particular sequence.
# Recall that B is an M x T matrix
# For each hidden state, M, at each tim, there is a probability of observing whatever
# value was in our sequence
# We have the following below:
# - component_pdf -> finds a single value of B, at given hidden state and time
# - state_pdfs ----> finds a column in B; for all hidden states, and a single time
# - gmm_pdf -------> finds a full B matrix, M x T. Loops over all time steps and
# determines each column of B, resulting in a final B matrix
def mvn_pdf(x, mu, sigma):
"""Need to define our own mvn because it does not exist in theano."""
k = 1 / T.sqrt(twopiD * T.nlinalg.det(sigma))
e = T.exp(-0.5*(x - mu).T.dot(T.nlinalg.matrix_inverse(sigma).dot(x-mu)))
return k * e
def gmm_pdf(x):
def state_pdfs(xt):
"""Create the B observation matrix.
Takes in x at a particular time t. Will perform a theano scan, iterating
over all hidden states M, finding their respective B(j,t). """
def component_pdf(j, xt):
"""Takes in hidden state and x at a particular time t. This
function is going to calculate B(j, t). Note, this must be defined as a
function that so it can be passed into theano scan.
Args:
- j: current sequence in sequences pass in from scan below
- xt: non sequence argument passed in from scan below
"""
Bj_t = 0
for k in range(self.K):
Bj_t += self.R[j, k] * mvn_pdf(xt, self.mu[j,k], self.sigma[j,k])
return Bj_t
Bt, _ = theano.scan(
fn=component_pdf,
sequences=T.arange(self.M),
n_steps=self.M,
outputs_info=None,
non_sequences=[xt]
)
return Bt
# Calculate full B matrix
B, _ = theano.scan(
fn=state_pdfs,
sequences=x,
n_steps=x.shape[0],
outputs_info=None
)
return B.T
B = gmm_pdf(thx)
def recurrence_to_find_alpha(t, old_a, B):
"""Forward algorithm."""
a = old_a.dot(self.A) * B[:, t]
s = a.sum()
return (a / s), s
[alpha, scale], _ = theano.scan(
fn=recurrence_to_find_alpha,
sequences=T.arange(1, thx.shape[0]),
outputs_info=[self.pi*B[:,0], None],
n_steps=thx.shape[0] - 1,
non_sequences=[B]
)
cost = -T.log(scale).sum()
self.cost_op = theano.function(
inputs=[thx],
outputs=cost
)
return thx, cost
def log_likelihood_multi(self, X):
return np.array([self.cost_op(x) for x in X])
def real_signal():
"""
Extracts raw audio from Wav file.
Right click on the file and go to "get info", you will see that:
- sampling rate = 16000 Hz
- bits per sample = 16
- The first is quanitization in time
- The second is quantization in amplitude
This is also done for images. 2^16 = 65536 is the number of different sound levels
that we have.
"""
spf = wave.open("helloworld.wav")
signal = spf.readframes(-1)
signal = np.fromstring(signal, "Int16")
T = len(signal)
signal = (signal - signal.mean()) / signal.std() # Normalize
hmm = HMM(5, 3)
# Signal needs to be of shape N x T(n) x D
hmm.fit(signal.reshape(1, T, 1), learning_rate=10e-6, max_iter=20)
if __name__ == "__main__":
real_signal()