import numpy as np
import scipy.io
Load the assignment data.
vit = scipy.io.loadmat('./a4/viterbiData.mat')
p0 = vit['p0']
pT_long = vit['pT_long']
pT_short1 = vit['pT_short1']
pT_short2 = vit['pT_short2']
The data gives the transition matrices for an inhomogenous markov chain so for each time step $t$, we have a different probability. In particular, we have four transition probabilities for each time step: $p(x_t = 1|x_{t-1} = 1)$, $p(x_t = 2|x_{t-1} = 1)$, $p(x_t = 1|x_{t-1} = 2)$, $p(x_t = 2|x_{t-1} = 2)$, so our transition probabilities are stored in a 3D tensor with shape $(2, 2, T)$ where $T$ is the total number of time steps. Here's the short example chain:
print "Transition probability tensor shape: ", pT_short1.shape
print pT_short1
print "p_0", p0
pT_long.shape
So, for example, at time $t=2$, we have the follow transition probabilities:
pT_short1[:,:,2]
which says: $$p(x_3=0|x_2=0) = 0.10570943,$$ $$p(x_3=1|x_2=0) = 0.89429057,$$ $$p(x_3=0|x_2=1) = 0.14204112,$$ $$p(x_3=1|x_2=1) = 0.85795888.$$
you can sample by drawing uniforms and testing if they're less than your parameter $p$. e.g. to get 10 biased coin flips which come up heads 75% of the time:
1*(np.random.rand(10) < 0.75) # the 1* is a quick hack to convert from bool to int...
(np.random.rand(100000) < 0.75).mean() # mean of 100000 samples
Alternatively you can use numpy's binomial function:
np.random.binomial(1, 0.75, size=10)
It also takes vectors of probabilities as input so, for example, to flip a fair coin then a biased coin, you could do:
np.random.binomial(1, [0.5,0.75])
np.random.binomial(1, [0.5,0.75], size=(10, 2)) # 10 samples, fair coin in the first column, biased in the second
Let's start by sampling from $p_0$ to get our start state:
n = 10 # number of samples
# Python indexes from 0 so we'll use 0 to represent state 0 and 1 to represent state 1.
x_0 = 1 - ( np.random.rand(n) < p0[0,0]) # The 1 - () ensures that all the state 0 samples = 0 and state 1 samples = 1
(x_0 == 0).mean() # this computes the probability that we're in state 0
Now, for every on of those samples, we want to sample a value for $x_1$ from $p(x_1 | x_0)$. For pT_short1
, we can do this as follows:
x_1 = np.zeros_like(x_0) # empty array to store our samples
for i in x_0:
print 'x_0 = %d, p(x_1 = 0 | x_0 = %d) = %f' % (i, i, pT_short1[i, 0, 0])
x_1 = 1 - ( np.random.rand(n) < pT_short1[i, 0, 0])
print x_1
We can generalize this to deal with the full set of transition probabilities:
def sample_ancestral(p0, pT, n=10000):
'''
p0: (states_0 vector) starting probabilities.
pT: (states_in x states_out x T tensor) transition probabilities.
n: (int) number of samples to draw.
'''
T = pT.shape[2] +1
X_samples = np.zeros((n,T), dtype='int')
for i in xrange(n):
if np.random.rand() < p0[0][0]:
X_samples[i, 0] = 0
else:
X_samples[i, 0] = 1
for j in xrange(1, T):
if np.random.rand() < pT[X_samples[i,j-1], 0, j-1]:
X_samples[i,j] = 0
else:
X_samples[i,j] = 1
return X_samples
X_samples = sample_ancestral(p0, pT_short1, 100000)
(X_samples==0).mean(axis=0)
Python (and Matlab) tend to have very slow for loops... it's a good habit to learn to vectorize your code. Here's the how we'd do it for the sampling code:
def efficient_sample_ancestral(p0, pT, n=10000):
'''
p0: starting probabilities.
pT: transition probabilities.
n: number of samples to draw.
'''
d = pT.shape[2] +1
X_samples = np.zeros((n,d), dtype='int')
X_samples[:, 0] = 1-np.random.binomial(1, p0[0,0], X_samples.shape[0])
for j in xrange(1, d):
pxt_xt1 = pT[X_samples[:, j-1],0,j-1] # here I'm indexing with the X_samples vector to get a vector of probabilities
X_samples[:, j] = 1-np.random.binomial(1,pxt_xt1)
return X_samples
X_samples = efficient_sample_ancestral(p0, pT_short1, 100000)
(X_samples==0).mean(axis=0)
In the above example, we computed the marginals using sampling, but we can do it exactly using the Chapman-Kolmogorov equations:
$$
p(x_t) = \sum_{x_{t-1}} p(x_t | x_{t-1})p( x_{t-1})
$$
Now, we have p(x_0 = 0) from p0
.
print 'p(x_0 = 0) = %f' % p0[0,0]
p_0 = p0[0,0]
So, to get $p(x_1 = 0)$ we compute, $p(x_1 = 0) = p(x_1 = 0 | x_0=0)p(x_0=0) + p(x_1 = 0 | x_0=1)p(x_0=1)$
p_x_1 = pT_short1[0, 0, 0] * p_0 + pT_short1[1, 0, 0] * (1 - p_0)
p_x_1
In general for pT_short1
:
d = pT_short1.shape[2] + 1
p = np.zeros(d)
p[0] = p0[0,0]
for i in xrange(1,d):
p[i] = pT_short1[0, 0, i-1] * p[i-1] + pT_short1[1, 0, i-1] * (1 - p[i-1])
print p
(efficient_sample_ancestral(p0, pT_short1, 100000)==0).mean(axis=0)
The above code was correct for our purposes, but only kept track of $p(x_t = 0)$. We can get $p(x_t = s)$ for all states in a simmilar fashion as follows:
def compute_marginals(p0, pT):
'''
p0: starting probabilities.
pT: transition probabilities.
'''
d = pT.shape[2] + 1
M = np.zeros((2,d))
M[:,0] = p0[0]
for i in xrange(1,d):
M[:,i] = np.dot(M[:,i-1] , pT[:,:,i-1])
return M
M = compute_marginals(p0, pT_short1)
M[0,:]
M[1,:]
M[0,:] + M[1,:]
Now, we'd like to find the most likely sequence... first let's code up a quick function for telling us the probability of a particular sequence. In your assignment, this is done for you in decodeProb.m
, but let's rewrite it here...
def joint_prob(y, p0, pT):
'''
y: the sequence to evaluate
p0: starting probabilities.
pT: transition probabilities.
'''
d = pT.shape[2] + 1
prob = p0[0, y[0]]
for i in xrange(1,d):
prob *= pT[y[i-1], y[i], i-1]
return prob
The brute force way of finding the most likely sequence is to do the following (exactDecode.m
does the same thing with a more elegent way of representing the nested for
loops).
y = [1, 0, 0, 1]
joint_prob(y, p0, pT_short1)
y = [1, 0, 0, 1]
max_prob = -0.
for y1 in [0, 1]:
for y2 in [0, 1]:
for y3 in [0,1]:
for y4 in [0,1]:
y = [y1, y2, y3, y4]
p = joint_prob(y, p0, pT_short1)
if p > max_prob:
y_best = y
max_prob = p
print 'Most likely sequence: ', y_best
The first thing you might try is to maximize the marginal probability.
M = compute_marginals(p0,pT_short1)
np.argmax(M,axis=0)
In this case the sequence that maximised our joint probability happend to be the one that maximised the marginal proabability, but in general that won't be true... to find the sequence that maximises the joint probability, we need to use Viterbi decoding below...
Let's do this more efficiently...
Recall from the lectures, that we're going to memoize the solution. Because each time step has the same number of states, we can use a matrix for this of size (num states, num time steps).
M = np.zeros((2, pT_short1.shape[2] + 1))
T = np.zeros((2,d),dtype='int') # where we'll keep our pointers indicating which state lead us where...
$M_0(x_0) = p(x_0)$ so the first state is trivial
M[:,0] = p0[0]
# we don't update T for time 0 because we just started our chain at 0 so we don't have a pointer to a preceeding state
Now, to get $M_1(x_1)$ we have the following: $M_1(x_1) = \max_{x_0} p(x_1 | x_0) M_0(x_0)$ Which essentially says, "given that I end up at $x_1$, what is the probability of the most likely step that I took to get there". We can do this because from the markov property, we know that given $x_0$, nothing that happened before $x_0$ effects the probability of being in a particular state at time 1 (obviously that's trivally true in this case because $x_0$ is our start state, but this holds more generally), so because we're trying to find the most likely sequence, it suffices to just think about all the ways that I could have gotten to a particular $x_1$ value and pick the most likely of them. For example:
# M[0,1] gives us the probability of the most likely sequence that ends in state 0 at time 1.
ways_to_get_to_state_0 = []
ways_to_get_to_state_0.append(M[0,0] * pT_short1[0, 0, 0]) # start at state 0 at time 0 and go to state 0 at time 1
ways_to_get_to_state_0.append(M[1,0] * pT_short1[1, 0, 0]) # start at state 1 at time 0 and go to state 0 at time 1
M[0,1] = np.max(ways_to_get_to_state_0)
T[0,1] = np.argmax(ways_to_get_to_state_0) # keep track of which of the two states the most likely path came from...
ways_to_get_to_state_1 = []
ways_to_get_to_state_1.append(M[0,0] * pT_short1[0, 1, 0]) # start at state 0 at time 0 and go to state 0 at time 1
ways_to_get_to_state_1.append(M[1,0] * pT_short1[1, 1, 0]) # start at state 1 at time 0 and go to state 0 at time 1
M[1,1] = np.max(ways_to_get_to_state_1)
T[1,1] = np.argmax(ways_to_get_to_state_1) # keep track of which of the two states the most likely path came from...
In general, this is what you'd do:
def viterbi_decode(p0, pT):
d = pT.shape[2] + 1
M = np.zeros((2,d))
T = np.zeros((2,d),dtype='int')
M[:,0] = p0[0]
for i in xrange(1,d):
for s in [0, 1]:
m = [M[0,i-1] * pT[0,s,i-1], M[1,i-1] * pT[1,s,i-1]]
M[s,i] = np.max(m)
T[s,i] = np.argmax(m)
y_best = np.zeros(d, dtype='int')
y_best[-1] = np.argmax(M[:,-1])
for i in xrange(d-1, 1, -1):
y_best[i-1] = T[y_best[i], i]
return y_best, M, T
y_best, M, T = viterbi_decode(p0, pT_short1)
y_best
print M[y_best[-1],-1] == joint_prob(y_best, p0, pT_short1) # check that we haven't made a mistake