We have now covered the vast majority of the theory related to HMM's. We have seen how they model the probability of a sequence, and can handle when those sequences deal with latent states. Additionally we saw how they can be extended to deal with continuous observations by incorporating the concept of a gaussian mixture model in place of the $B$ emission matrix. I want to take a post to go over a few different real world applications of HMM's and leave you with a few ideas of what else is possible.
Recall in the introduction that we discussed how even though the HMM is an unsupervised model because it models a probability, we can utilize it effectively as a classifier by making use of bayes rule. There is a name for these types of classifiers: Generative Models. The reason for this name is because they take the probability density of a class, and you pretend that it generates the data that you observe.
Compare this to discrimative models, such as logistic regression or deep neural networks. These models give you probability of the class given the input directly; there is no modeling of the likelihood probability.
Interestingly, sometimes these two types of models intersect. In my posts on logistic regression I discuss that often we cannot solve for the weights directly and must use gradient descent. There was an exception to this rule, however; if we assume that each class was gaussians and had the same covariance, but different means, then the weights can be solved for directly. So, if we know that both classes are gaussian, and we know their means and covariances, then we know their likelihoods. Hence, this is a generative model also. In this particular setup, generative and discriminative models intersect.
Now, you may be wondering what the benefits and drawbacks of each model type are. Generative models tend to have a great deal of theory behind them, and they are very principled. However, research has shown that discriminative models simple work better (aka deep learning). With that said, one of the drawbacks of deep learning is that it is very hard to explain exactly what is going on, or what features are being learned. There is no graphical model that you can point to and say: "here is a strong connection between these two nodes which represents something in physical reality". That is fine because often what we care about is accuracy and functionality, not just simple relationships between input variables. The strength of neural networks is that they model complex relationships; unfortunately that complexity means that they can't be explained in terms that would be satisfactory to a statistician.
Now, let's take a chance to implement an HMM classifier; specifically we will try and classify poetry as either being written by Robert Frost or Edgar Allan Poe.
import numpy as np
from scipy.stats import bernoulli, binom, norm
import seaborn as sns
import matplotlib.pyplot as plt
from IPython.core.display import HTML
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
sns.set(style="white", palette="husl")
sns.set_context("talk")
sns.set_style("ticks")
import string
import numpy as np
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from nltk import pos_tag, word_tokenize
from hmm.hmm_theano import HMM
from hmm.utils import get_obj_s3
# from hmm.generate_hmm_continuous import get_signals, big_init, simple_init
# from hmm.utils import random_normalized
class HMMClassifier():
def __init__(self):
pass
def fit(self, X, Y, V):
K = len(set(Y)) # Number of classes
N = len(Y)
self.models = []
self.priors = [] # Priors are for bayes rule
for k in range(K):
# Loop through all classes
thisX = [x for x, y in zip(X, Y) if y == k] # All x's the belong to class k
C = len(thisX) # Number of elements of class k
self.priors.append(np.log(C) - np.log(N)) # Not technically the prior
hmm = HMM(5) # Create an HMM with 5 hidden states
hmm.fit(thisX, V=V, p_cost=0.1, print_period=1, learning_rate=10e-5, max_iter=100)
self.models.append(hmm)
def score(self, X, Y):
N = len(Y)
correct = 0
for x, y in zip(X, Y):
posteriors = [hmm.log_likelihood(x) + prior for hmm, prior in zip(self.models, self.priors)]
p = np.argmax(posteriors)
if p == y:
correct += 1
return float(correct) / N
def get_tags(s):
"""Determines parts of speech tags for a given string."""
word_tag_tuples = pos_tag(word_tokenize(s))
return [tag for word, tag in word_tag_tuples]
def get_data():
"""Gather's blocks of text for each author, and determines the POS tags for each."""
word2idx = {}
current_idx = 0
X = [] # Sequences
Y = [] # Labels
for file_name, label in zip(("robert_frost.txt", "edgar_allan_poe.txt"), (0,1)):
count = 0
for line in get_obj_s3(file_name).read().decode("utf-8").split("\n"):
line = line.rstrip()
if line:
tokens = get_tags(line)
if len(tokens) > 1:
for token in tokens:
if token not in word2idx:
word2idx[token] = current_idx
current_idx += 1
sequence = np.array([word2idx[w] for w in tokens])
X.append(sequence)
Y.append(label)
count += 1
if count >= 50:
break
print("Vocabulary: ", word2idx.keys())
return X, Y, current_idx
def main():
X, Y, V = get_data()
# We will not be using the words directly because there are so many of them
# Rather, we will use parts of speech tagging instead
X, Y = shuffle(X, Y)
N = 20 # Number of test samples
Xtrain, Ytrain = X[:-N], Y[:-N]
Xtest, Ytest = X[-N:], Y[-N:]
model = HMMClassifier()
model.fit(Xtrain, Ytrain, V)
print("Score: ", model.score(Xtest, Ytest))
if __name__ == "__main__":
main()
We now are going to go through an example that deals with parts of speech tagging. Below is a preview of what the data looks like:
Confidence NN B-NP
in IN B-NP
the DT B-NP
pound NN I-NP
is VBZ B-VP
widely RB I-VP
expected VBN I-VP
So, each token is on a separate line, and the tag is decided. We are interested in the first two columns. We can see that every word has a corresponding tag.
Now, in general when faced with a machine learning problem a very good thing to do is find a baseline. What is a good baseline that we know that can be used for classification? Logisitic regression. We can see how this would be applied to logistic regression very easily; we would one hot encode the words and the tags, and then perform softmax. We will see that we can already do very well with just that (over 90% accuracy). This means that over 90% of the time, a word has the same tag. It is very rare that a word can be used in two ways. For example:
I drank milk this morning.
This company is milking its customers' money.
We will also caculate the F1 score, which is the harmonic mean of the precision and recall:
$$Precision = \frac{\text{True positives}}{\text{True positives + False positives}}$$$$Recall = \frac{\text{True positives}}{\text{True positives + False negatives}}$$$$F1 = \frac{2(precision \cdot recall)}{precision + recall}$$One question that you may want to ask is can we use the fact that these words show up in a sequence, and hence can we use the context to achieve better predictions? The answer is yes we can! We could utilize a recurrent neural network, or a Hidden Markov Model! We will use hidden markov model's since that is the focus of this post.
We can think of the words as the observed sequence, and the tags as the sequence of hidden states. There is one small twist, however. Generally with HMM's we use an algorithm called expectation maximization to train the model. We will not need that in this scenario! This is because we actually know the hidden states; in other words, the hidden states are not some abstract hyper parameter, but an actual real thing.
Recall that HMM's are defined by three things:
All of these can be calculated by using maximum likelihood directly (just by counting)! For example, $\pi$ will just be the frequency of the start tags. We know that $A$ is markov, so we can just count up all of the transitions and divide by the row sums. Finally, the observation probabilities depend only on the current state. So that is just the probability of a word given the tag. This can also be calculated just by counting.
So, we are going to solve the parts of speech problem as follows:
import numpy as np
import theano
import theano.tensor as T
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.metrics import f1_score
from sklearn.tree import DecisionTreeClassifier
from hmm.utils import get_obj_s3
class LogisticRegression:
def __init__(self):
pass
def fit(self, X, Y, V=None, K=None, lr=10e-1, mu=0.99, batch_sz=100, epochs=6):
"""Fit our logistic regression model.
Variables:
- V: Vocabulary size, aka the number of input features
- K: The number of classes (pos tags)
- N: The number of training examples
- W: Weight matrix from input layer (V) to output layer (K)
Remember, the W matrix in logistic regression can be thought
of as a learned representation of the output classes, K. Each
column (K total) holds a set of weights that map back to each
individual node in the input layer. These columns can be thought
of as representations in the vocabulary space of each part of
speech tag. For an individual sample we can see how similar it
is to each column W to determine the most probable class.
Steps:
- Define all necessary variables
- Determine p(y | x), where x is a word vector representing the input word
"""
if V is None:
V = len(set(X))
if K is None:
K = len(set(Y))
N = len(X)
# Create out weight matrix, V x K
W = np.random.randn(V, K) / np.sqrt(V + K)
b = np.zeros(K)
self.W = theano.shared(W)
self.b = theano.shared(b)
self.params = [self.W, self.b]
thX = T.ivector("X")
thY = T.ivector("Y")
# The general equation for logistic regression is:
# p(y|x) = w*x + b
# In this case, we can simple do self.W[thX] because thX is just an
# array of indices that
py_x = T.nnet.softmax(self.W[thX] + self.b)
prediction = T.argmax(py_x, axis=1)
cost = -T.mean(T.log(py_x[T.arange(thY.shape[0]), thY]))
grads = T.grad(cost, self.params)
dparams = [theano.shared(p.get_value()*0) for p in self.params]
self.cost_predict_op = theano.function(
inputs=[thX, thY],
outputs=[cost, prediction],
allow_input_downcast=True,
)
updates = [
(p, p + mu*dp - lr*g) for p, dp, g in zip(self.params, dparams, grads)
] + [
(dp, mu*dp - lr*g) for dp, g in zip(dparams, grads)
]
train_op = theano.function(
inputs=[thX, thY],
outputs=[cost, prediction],
updates=updates,
allow_input_downcast=True
)
costs = []
n_batches = N // batch_sz
for i in range(epochs):
X, Y = shuffle(X, Y)
print("epoch:", i)
for j in range(n_batches):
Xbatch = X[j*batch_sz:(j*batch_sz + batch_sz)]
Ybatch = Y[j*batch_sz:(j*batch_sz + batch_sz)]
c, p = train_op(Xbatch, Ybatch)
costs.append(c)
if j % 200 == 0:
print(
"i:", i, "j:", j,
"n_batches:", n_batches,
"cost:", c,
"error:", np.mean(p != Ybatch)
)
plt.figure(figsize=(8,5))
plt.plot(costs, color="blue")
plt.xlabel("Iteration Number")
plt.ylabel("Cost")
plt.show()
def score(self, X, Y):
_, p = self.cost_predict_op(X, Y)
return np.mean(p == Y)
def f1_score(self, X, Y):
_, p = self.cost_predict_op(X, Y)
return f1_score(Y, p, average=None).mean()
def get_data(split_sequences=False):
word2idx = {}
tag2idx = {}
word_idx = 0
tag_idx = 0
Xtrain = []
Ytrain = []
currentX = []
currentY = []
for line in get_obj_s3("pos_train.txt").read().decode("utf-8").split("\n"):
line = line.rstrip()
if line:
r = line.split()
word, tag, _ = r
if word not in word2idx:
word2idx[word] = word_idx
word_idx += 1
currentX.append(word2idx[word])
if tag not in tag2idx:
tag2idx[tag] = tag_idx
tag_idx += 1
currentY.append(tag2idx[tag])
elif split_sequences:
Xtrain.append(currentX)
Ytrain.append(currentY)
currentX = []
currentY = []
if not split_sequences:
Xtrain = currentX
Ytrain = currentY
# load and score test data
Xtest = []
Ytest = []
currentX = []
currentY = []
for line in get_obj_s3("pos_test.txt").read().decode("utf-8").split("\n"):
line = line.rstrip()
if line:
r = line.split()
word, tag, _ = r
if word in word2idx:
currentX.append(word2idx[word])
else:
currentX.append(word_idx) # use this as unknown
currentY.append(tag2idx[tag])
elif split_sequences:
Xtest.append(currentX)
Ytest.append(currentY)
currentX = []
currentY = []
if not split_sequences:
Xtest = currentX
Ytest = currentY
return Xtrain, Ytrain, Xtest, Ytest, word2idx
def main():
Xtrain, Ytrain, Xtest, Ytest, word2idx = get_data()
# convert to numpy arrays
Xtrain = np.array(Xtrain)
Ytrain = np.array(Ytrain)
# convert Xtrain to indicator matrix
N = len(Xtrain)
V = len(word2idx) + 1
print("vocabulary size:", V)
# Xtrain_indicator = np.zeros((N, V))
# Xtrain_indicator[np.arange(N), Xtrain] = 1
# decision tree
dt = DecisionTreeClassifier()
# without indicator
dt.fit(Xtrain.reshape(N, 1), Ytrain)
print("dt train score:", dt.score(Xtrain.reshape(N, 1), Ytrain))
p = dt.predict(Xtrain.reshape(N, 1))
print("dt train f1:", f1_score(Ytrain, p, average=None).mean())
# with indicator -- too slow!!
# dt.fit(Xtrain_indicator, Ytrain)
# print("dt score:", dt.score(Xtrain_indicator, Ytrain))
# train and score
model = LogisticRegression()
model.fit(Xtrain, Ytrain, V=V)
print("training complete")
print("lr train score:", model.score(Xtrain, Ytrain))
print("lr train f1:", model.f1_score(Xtrain, Ytrain))
Ntest = len(Xtest)
Xtest = np.array(Xtest)
Ytest = np.array(Ytest)
# decision tree test score
print("dt test score:", dt.score(Xtest.reshape(Ntest, 1), Ytest))
p = dt.predict(Xtest.reshape(Ntest, 1))
print("dt test f1:", f1_score(Ytest, p, average=None).mean())
# logistic test score -- too slow!!
print("lr test score:", model.score(Xtest, Ytest))
print("lr test f1:", model.f1_score(Xtest, Ytest))
if __name__ == "__main__":
main()
The nice thing about hidden markov models is that if the hidden states have an explicit meaning we can actually measure them directly. For instance, if our hidden states represent the part of speech tag for a given word, we can determine that for all of our training examples. In that case, we can determine $A$ rather easily! We would count up all of the times we transition from one pos tag to another, and divide by the total number of times we are in the first part of speech tag to begin with. For example, if we want to know the probability of transition from a noun to a verb, we would simply do:
$$A(noun, verb) = \frac{\text{# times nouns transitions to verb}}{\text{total # times hidden state is noun}}$$Where we know that $A$'s probabilistic meaning is:
$$A(noun, verb) = p \big(z(t) = verb \mid z(t-1) = noun\big)$$This simple counting can be done for all different hidden state (pos tag) transitions (note this is simply maximum likelihood). In order to determine the $B$ observation matrix, we can simply count the number of times we observed a certain word in our vocabulary, given we are in a particular hidden state (pos tag). For instance, if we wanted to know the probability of observing the word milk
given our word is a verb, we would do:
Again, we know that $B$ can be defined as:
$$B(milk, verb) = p \big(x(t) = milk \mid z(t) = verb\big)$$One of the main algorithms that we can perform on an HMM is the viterbi algorithm, which as we know allows us to look at problems in the reverse direction. Normally, we think of hidden causes as producing the observations that we see. The viterbi algorithm allows us to ask:
Given a sequence of observations, what is the most likely sequence of hidden states?
So, in the example above, our observations would be a string of words, and the viterbi algorithm would allow us to determine the most likely sequence of parts of speech. What is important to be aware of here is the API of the viterbi algorithm:
input -> sequence of observations
output -> corresponding sequence of hidden states
If we think about what we are actually doing when modeling parts of speech tags with an HMM, we come to the conclusion that we are coming up with a probability model of grammar! This is pretty cool if you ask me. Our model will encapsulate the principle that a verb often follows a noun, and a verb is unlikely to follow a verb:
$$p(verb \mid noun) \rightarrow high$$For example:
"John ran"
And likewise:
$$p(verb \mid verb) \rightarrow low$$"walk jump"
Here is how we will use HMM's in order to do parts of speech tagging:
from functools import reduce
import numpy as np
from sklearn.metrics import f1_score
from hmm.discrete_hmm_scaled import HMM
from hmm.baseline_logistic_regression import get_data
def accuracy(T, Y):
# T: targets, Y: predictions
# inputs are lists of lists
n_correct = 0
n_total = 0
for t, y in zip(T, Y):
n_correct += np.sum(t == y)
n_total += len(y)
return float(n_correct) / n_total
def total_f1_score(T, Y):
# inputs are lists of lists
T = np.concatenate(T)
Y = np.concatenate(Y)
return f1_score(T, Y, average=None).mean()
def main(smoothing=1e-1):
# X = words, Y = POS tags
Xtrain, Ytrain, Xtest, Ytest, word2idx = get_data(split_sequences=True)
V = len(word2idx) + 1
# Find hidden state transition matrix (A) and initial state distribution (pi)
M = len(set(reduce(lambda x,y: x+y, Ytrain))) + 1
A = np.ones((M, M)) * smoothing # Add-one smoothing
pi = np.zeros(M)
for y in Ytrain:
# Loop through all hidden states (pos tags)
if len(y) > 0:
pi[y[0]] += 1
for i in range(len(y) - 1):
A[y[i], y[i+1]] += 1
# Turn A and pi into probability matrices
A /= A.sum(axis=1, keepdims=True)
pi /= pi.sum()
# Find the observation matrix
B = np.ones((M, V)) * smoothing
for x, y in zip(Xtrain, Ytrain):
for xi, yi in zip(x, y):
B[yi, xi] += 1
B /= B.sum(axis=1, keepdims=True)
hmm = HMM(M)
hmm.pi = pi
hmm.A = A
hmm.B = B
# get predictions
Ptrain = []
for x in Xtrain:
p = hmm.get_state_sequence(x)
Ptrain.append(p)
Ptest = []
for x in Xtest:
p = hmm.get_state_sequence(x)
Ptest.append(p)
# print results
print("train accuracy:", accuracy(Ytrain, Ptrain))
print("test accuracy:", accuracy(Ytest, Ptest))
print("train f1:", total_f1_score(Ytrain, Ptrain))
print("test f1:", total_f1_score(Ytest, Ptest))
if __name__ == '__main__':
main()