python实现HMM

一份完全按照李航<<统计学习方法>>介绍的HMM代码。

#coding=utf8
'''
Created on 2017-8-5
里面的代码许多地方可以精简,但为了百分百还原公式,就没有精简了。
@author: adzhua
'''import numpy as npclass HMM(object):def __init__(self, A, B, pi):'''A: 状态转移概率矩阵B: 输出观察概率矩阵pi: 初始化状态向量'''self.A = np.array(A)self.B = np.array(B)self.pi = np.array(pi)self.N = self.A.shape[0]    # 总共状态个数self.M = self.B.shape[1]    # 总共观察值个数   # 输出HMM的参数信息def printHMM(self):print ("==================================================")print ("HMM content: N =",self.N,",M =",self.M)for i in range(self.N):if i==0:print ("hmm.A ",self.A[i,:]," hmm.B ",self.B[i,:])else:print ("      ",self.A[i,:],"       ",self.B[i,:])print ("hmm.pi",self.pi)print ("==================================================")# 前向算法  def forwar(self, T, O, alpha, prob):'''T: 观察序列的长度O: 观察序列alpha: 运算中用到的临时数组prob: 返回值所要求的概率'''    # 初始化for i in range(self.N):alpha[0, i] = self.pi[i] * self.B[i, O[0]]# 递归for t in range(T-1):for j in range(self.N):sum = 0.0for i in range(self.N):sum += alpha[t, i] * self.A[i, j]alpha[t+1, j] = sum * self.B[j, O[t+1]]        # 终止sum = 0.0for i in range(self.N):sum += alpha[T-1, i]prob[0] *= sum   # 带修正的前向算法def forwardWithScale(self, T, O, alpha, scale, prob):scale[0] = 0.0# 初始化for i in range(self.N):alpha[0, i] = self.pi[i] * self.B[i, O[0]]scale[0] += alpha[0, i]for i in range(self.N):alpha[0, i] /= scale[0]# 递归for t in range(T-1):scale[t+1] = 0.0for j in range(self.N):sum = 0.0for i in range(self.N):sum += alpha[t, i] * self.A[i, j]alpha[t+1, j] = sum * self.B[j, O[t+1]]scale[t+1] += alpha[t+1, j]for j in range(self.N):alpha[t+1, j] /= scale[t+1]# 终止for t in range(T):prob[0] += np.log(scale[t])       def back(self, T, O, beta, prob):  '''T: 观察序列的长度    len(O)O: 观察序列beta: 计算时用到的临时数组prob: 返回值;所要求的概率''' # 初始化               for i in range(self.N):beta[T-1, i] = 1.0# 递归for t in range(T-2, -1, -1): # 从T-2开始递减;即T-2, T-3, T-4, ..., 0for i in range(self.N):sum = 0.0for j in range(self.N):sum += self.A[i, j] * self.B[j, O[t+1]] * beta[t+1, j]beta[t, i] = sum# 终止sum = 0.0for i in range(self.N):sum +=  self.pi[i]*self.B[i,O[0]]*beta[0,i]prob[0] = sum    # 带修正的后向算法def backwardWithScale(self, T, O, beta, scale):'''T: 观察序列的长度 len(O)O: 观察序列beta: 计算时用到的临时数组'''# 初始化for i in range(self.N):beta[T-1, i] = 1.0# 递归               for t in range(T-2, -1, -1):for i in range(self.N):sum = 0.0for j in range(self.N):sum += self.A[i, j] * self.B[j, O[t+1]] * beta[t+1, j]beta[t, i] = sum / scale[t+1]       # viterbi算法            def viterbi(self, O):'''O: 观察序列'''T = len(O)# 初始化delta = np.zeros((T, self.N), np.float)phi = np.zeros((T, self.N), np.float)I = np.zeros(T)for i in range(self.N):delta[0, i] = self.pi[i] * self.B[i, O[0]]phi[0, i] = 0.0# 递归for t in range(1, T):for i in range(self.N):delta[t, i] = self.B[i, O[t]] * np.array([delta[t-1, j] * self.A[j, i] for j in range(self.N)] ).max()phi = np.array([delta[t-1, j] * self.A[j, i] for j in range(self.N)]).argmax()# 终止prob = delta[T-1, :].max()I[T-1] = delta[T-1, :].argmax()for t in range(T-2, -1, -1):I[t] = phi[I[t+1]]return prob, I# 计算gamma(计算A所需的分母;详情见李航的统计学习) : 时刻t时马尔可夫链处于状态Si的概率def computeGamma(self, T, alpha, beta, gamma):''''''for t in range(T):for i in range(self.N):sum = 0.0for j in range(self.N):sum += alpha[t, j] * beta[t, j]gamma[t, i] = (alpha[t, i] * beta[t, i]) / sum   # 计算sai(i,j)(计算A所需的分子) 为给定训练序列O和模型lambda时def computeXi(self, T, O, alpha, beta, Xi):for t in range(T-1):sum = 0.0for i in range(self.N):for j in range(self.N):Xi[t, i, j] = alpha[t, i] * self.A[i, j] * self.B[j, O[t+1]] * beta[t+1, j]sum += Xi[t, i, j]for i in range(self.N):for j in range(self.N):Xi[t, i, j] /= sum#  输入 L个观察序列O,初始模型:HMM={A,B,pi,N,M}def BaumWelch(self, L, T, O, alpha, beta, gamma):                                    DELTA = 0.01 ; round = 0 ; flag = 1 ; probf = [0.0]delta = 0.0; probprev = 0.0 ; ratio = 0.0 ; deltaprev = 10e-70xi = np.zeros((T, self.N, self.N)) # 计算A的分子pi = np.zeros((T), np.float)    # 状态初始化概率denominatorA = np.zeros((self.N), np.float) # 辅助计算A的分母的变量denominatorB = np.zeros((self.N), np.float)numeratorA = np.zeros((self.N, self.N), np.float)   # 辅助计算A的分子的变量numeratorB = np.zeros((self.N, self.M), np.float)   # 针对输出观察概率矩阵scale = np.zeros((T), np.float)while True:probf[0] =0# E_stepfor l in range(L):self.forwardWithScale(T, O[l], alpha, scale, probf)self.backwardWithScale(T, O[l], beta, scale)self.computeGamma(T, alpha, beta, gamma)    # (t, i)self.computeXi(T, O[l], alpha, beta, xi)    #(t, i, j)for i in range(self.N):pi[i] += gamma[0, i]for t in range(T-1):denominatorA[i] += gamma[t, i]denominatorB[i] += gamma[t, i]denominatorB[i] += gamma[T-1, i]for j in range(self.N):for t in range(T-1):numeratorA[i, j] += xi[t, i, j]for k in range(self.M): # M为观察状态取值个数for t in range(T):if O[l][t] == k:numeratorB[i, k] += gamma[t, i]    # M_step。 计算pi, A, Bfor i in range(self.N): # 这个for循环也可以放到for l in range(L)里面self.pi[i] = 0.001 / self.N + 0.999 * pi[i] / Lfor j in range(self.N):self.A[i, j] = 0.001 / self.N + 0.999 * numeratorA[i, j] / denominatorA[i]                    numeratorA[i, j] = 0.0for k in range(self.M):self.B[i, k] = 0.001 / self.N + 0.999 * numeratorB[i, k] / denominatorB[i]numeratorB[i, k] = 0.0   #重置pi[i] = denominatorA[i] = denominatorB[i] = 0.0if flag == 1:flag = 0probprev = probf[0]ratio = 1continuedelta = probf[0] -  probprev ratio = delta / deltaprev   probprev = probf[0]deltaprev = deltaround += 1if ratio <= DELTA :print('num iteration: ', round)   breakif __name__ == '__main__':print ("python my HMM")# 初始的状态概率矩阵pi;状态转移矩阵A;输出观察概率矩阵B; 观察序列pi = [0.5,0.5]A = [[0.8125,0.1875],[0.2,0.8]]B = [[0.875,0.125],[0.25,0.75]]O = [[1,0,0,1,1,0,0,0,0],[1,1,0,1,0,0,1,1,0],[0,0,1,1,0,0,1,1,1]]L = len(O)T = len(O[0])   #  T等于最长序列的长度就好了hmm = HMM(A, B, pi)alpha = np.zeros((T,hmm.N),np.float)beta = np.zeros((T,hmm.N),np.float)gamma = np.zeros((T,hmm.N),np.float)# 训练hmm.BaumWelch(L,T,O,alpha,beta,gamma)# 输出HMM参数信息hmm.printHMM()    



本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部