# encoding: utf-8
'''
Created on 2017-8-7
根据李航<<统计学习方法>>实现
'''
from collections import defaultdict
import math
class MaxEnt(object):
def __init__(self):
self.feats = defaultdict(int)
self.trainset = []
self.labels = set()
def load_data(self, file):
for line in open(file):
fields = line.strip().split()
# 数据共3列。第一列为标签,二三列为特征
if len(fields) < 2: continue
label = fields[0]
self.labels.add(label)
for f in set(fields[1:]):
# (label,f) tuple is feature
self.feats[(label, f)] += 1
self.trainset.append(fields)
def _initparams(self):
self.size = len(self.trainset)
self.M = max([len(record) - 1 for record in self.trainset]) # P91中的M
# 计算P82页最下面的期望
self.ep_ = [0.0] * len(self.feats) # 保存期望值
for i, f in enumerate(self.feats):
self.ep_[i] = float(self.feats[f]) / float(self.size)
# each feature function correspond to id
self.feats[f] = i
# 初始化需要学习的参数的值
self.w = [0.0] * len(self.feats)
self.lastw = self.w
def probwgt(self, features, label):
'''
辅助函数:计算P85中的公式6.22中的分子
'''
wgt = 0.0
for f in features:
print (self.feats[(label, f)])
if (label, f)in self.feats:
wgt += self.w[self.feats[(label, f)]]
return math.exp(wgt)
def calprob(self, features):
'''
计算P85中的公式6.22的条件概率P(y|x)
'''
wgts = [(self.probwgt(features, label), label) for label in self.labels]
Z = sum([ w for w, label in wgts])
prob = [ (w / Z, label) for w, label in wgts]
return prob
def Ep(self):
'''
计算P83页最上面的期望
'''
eps = [0.0] * len(self.feats)
for record in self.trainset:
features = record[1:]
# 计算 p(y|x)
probs = self.calprob(features)
for f in features:
for prob, label in probs:
if (label, f) in self.feats: # only focus on features from training data.
idx = self.feats[(label, f)]
eps[idx] += prob * (1.0 / self.size) # 计算期望 sum(P(x) * P(y|x) * f(x,y))。 其中P(x) = 1 / N
return eps
def _convergence(self, lastw, w):
for w1, w2 in zip(lastw, w):
if abs(w1 - w2) >= 0.01:
return False
return True
def train(self, max_iter=1000):
self._initparams()
for i in range(max_iter):
print ('iter %d ...' % (i + 1))
self.ep = self.Ep()
self.lastw = self.w[:]
for i, w in enumerate(self.w):
delta = 1.0 / self.M * math.log(self.ep_[i] / self.ep[i]) # P91 公式6.34
self.w[i] += delta
# 是否满足收敛条件
if self._convergence(self.lastw, self.w):
break
def predict(self, input):
features = input.strip().split()
prob = self.calprob(features)
prob.sort(reverse=True)
return prob
if __name__ == "__main__":
maxent = MaxEnt()
maxent.load_data("input.data")
maxent.train(100)
prob = maxent.predict("Sunny Sad")
print (prob)
github上发现的一份最大熵模型实现代码。具体链接找不到了。