语料来自Brown corpus,词性集来自universal tagset
数据集是简单的文本文件,句子和句子之间是用空格分开,每一句子的第一行是句子的ID,单词和标志用TAB字符分割, 语料的格式如下:
b100-38532
Perhaps ADV
it PRON
was VERB
right ADJ
; .
; .
b100-35577
...
标记集总共十二个标志,如下:
.
ADJ
ADP
ADV
CONJ
DET
NOUN
NUM
PRON
PRT
VERB
X
简单来说就是给一个句子的单词标注对应的词性(名词、动词、形容词、副词等等)
from graphviz import Digraph
dot=Digraph(name='pos',comment='词性标注',engine='dot')
dot.attr(rankdir='LR', size='8,5')
#标注序列
dot.node('q0','...')
dot.node('q1','NN')
dot.node('q2','VB')
dot.node('q3','VB')
dot.node('q4','NN')
dot.node('qt','...')
#单词序列
dot.attr('node', shape='dashed')
dot.node('v1','Will')
dot.node('v2','will')
dot.node('v3','see')
dot.node('v4','Sally')
#添加边
dot.edge('q0','q1')
dot.edge('q1','q2')
dot.edge('q2','q3')
dot.edge('q3','q4')
dot.edge('q4','qt')
dot.edge('q1','v1')
dot.edge('q2','v2')
dot.edge('q3','v3')
dot.edge('q4','v4')
dot
import os
import matplotlib.pyplot as plt
import matplotlib.image as mplimg
import networkx as nx
import random
import matplotlib.pyplot as plt
import numpy as np
from itertools import chain
from collections import Counter, defaultdict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution
from io import BytesIO
from itertools import chain
from collections import namedtuple, OrderedDict
from IPython.core.display import HTML
Sentence = namedtuple("Sentence", "words tags")
def read_data(filename):
"""Read tagged sentence data"""
with open(filename, 'r') as f:
sentence_lines = [l.split("\n") for l in f.read().split("\n\n")]
return OrderedDict(((s[0], Sentence(*zip(*[l.strip().split("\t")
for l in s[1:]]))) for s in sentence_lines if s[0]))
def read_tags(filename):
"""Read a list of word tag classes"""
with open(filename, 'r') as f:
tags = f.read().split("\n")
return frozenset(tags)
def model2png(model, filename="", overwrite=False, show_ends=False):
"""Convert a Pomegranate model into a PNG image
The conversion pipeline extracts the underlying NetworkX graph object,
converts it to a PyDot graph, then writes the PNG data to a bytes array,
which can be saved as a file to disk or imported with matplotlib for display.
Model -> NetworkX.Graph -> PyDot.Graph -> bytes -> PNG
Parameters
----------
model : Pomegranate.Model
The model object to convert. The model must have an attribute .graph
referencing a NetworkX.Graph instance.
filename : string (optional)
The PNG file will be saved to disk with this filename if one is provided.
By default, the image file will NOT be created if a file with this name
already exists unless overwrite=True.
overwrite : bool (optional)
overwrite=True allows the new PNG to overwrite the specified file if it
already exists
show_ends : bool (optional)
show_ends=True will generate the PNG including the two end states from
the Pomegranate model (which are not usually an explicit part of the graph)
"""
nodes = model.graph.nodes()
if not show_ends:
nodes = [n for n in nodes if n not in (model.start, model.end)]
g = nx.relabel_nodes(model.graph.subgraph(nodes), {n: n.name for n in model.graph.nodes()})
pydot_graph = nx.drawing.nx_pydot.to_pydot(g)
pydot_graph.set_rankdir("LR")
png_data = pydot_graph.create_png(prog='dot')
img_data = BytesIO()
img_data.write(png_data)
img_data.seek(0)
if filename:
if os.path.exists(filename) and not overwrite:
raise IOError("File already exists. Use overwrite=True to replace existing files on disk.")
with open(filename, 'wb') as f:
f.write(img_data.read())
img_data.seek(0)
return mplimg.imread(img_data)
def show_model(model, figsize=(5, 5), **kwargs):
"""Display a Pomegranate model as an image using matplotlib
Parameters
----------
model : Pomegranate.Model
The model object to convert. The model must have an attribute .graph
referencing a NetworkX.Graph instance.
figsize : tuple(int, int) (optional)
A tuple specifying the dimensions of a matplotlib Figure that will
display the converted graph
**kwargs : dict
The kwargs dict is passed to the model2png program, see that function
for details
"""
plt.figure(figsize=figsize)
plt.imshow(model2png(model, **kwargs))
plt.axis('off')
class Subset(namedtuple("BaseSet", "sentences keys vocab X tagset Y N stream")):
def __new__(cls, sentences, keys):
word_sequences = tuple([sentences[k].words for k in keys])
tag_sequences = tuple([sentences[k].tags for k in keys])
wordset = frozenset(chain(*word_sequences))
tagset = frozenset(chain(*tag_sequences))
N = sum(1 for _ in chain(*(sentences[k].words for k in keys)))
stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
return super().__new__(cls, {k: sentences[k] for k in keys}, keys, wordset, word_sequences,
tagset, tag_sequences, N, stream.__iter__)
def __len__(self):
return len(self.sentences)
def __iter__(self):
return iter(self.sentences.items())
class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream")):
def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=112890):
tagset = read_tags(tagfile)
sentences = read_data(datafile)
keys = tuple(sentences.keys())
wordset = frozenset(chain(*[s.words for s in sentences.values()]))
word_sequences = tuple([sentences[k].words for k in keys])
tag_sequences = tuple([sentences[k].tags for k in keys])
N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
# split data into train/test sets
_keys = list(keys)
if seed is not None: random.seed(seed)
random.shuffle(_keys)
split = int(train_test_split * len(_keys))
training_data = Subset(sentences, _keys[:split])
testing_data = Subset(sentences, _keys[split:])
stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
return super().__new__(cls, dict(sentences), keys, wordset, word_sequences, tagset,
tag_sequences, training_data, testing_data, N, stream.__iter__)
def __len__(self):
return len(self.sentences)
def __iter__(self):
return iter(self.sentences.items())
data = Dataset("tags-universal.txt", "brown-universal.txt", train_test_split=0.8)
print("There are {} sentences in the corpus.".format(len(data)))
print("There are {} sentences in the training set.".format(len(data.training_set)))
print("There are {} sentences in the testing set.".format(len(data.testing_set)))
assert len(data) == len(data.training_set) + len(data.testing_set), \
"The number of sentences in the training set + testing set should sum to the number of sentences in the corpus"
key = 'b100-38532'
print("Sentence: {}".format(key))
print("words:\n\t{!s}".format(data.sentences[key].words))
print("tags:\n\t{!s}".format(data.sentences[key].tags))
print("There are a total of {} samples of {} unique words in the corpus."
.format(data.N, len(data.vocab)))
print("There are {} samples of {} unique words in the training set."
.format(data.training_set.N, len(data.training_set.vocab)))
print("There are {} samples of {} unique words in the testing set."
.format(data.testing_set.N, len(data.testing_set.vocab)))
print("There are {} words in the test set that are missing in the training set."
.format(len(data.testing_set.vocab - data.training_set.vocab)))
assert data.N == data.training_set.N + data.testing_set.N, \
"The number of training + test samples should sum to the total number of samples"
for i in range(2):
print("Sentence {}:".format(i + 1), data.X[i])
print()
print("Labels {}:".format(i + 1), data.Y[i])
print()
print("\nStream (word, tag) pairs:\n")
for i, pair in enumerate(data.stream()):
print("\t", pair)
if i > 5: break
这也许是最简单的标注模型,每个词的标志是统计该词被标注最频繁的标志,这个模型可以当作其他模型比较的基线(baseline)
from collections import defaultdict
def pair_counts(sequences_A, sequences_B):
"""
sequences_A:句子列表
sequences_B:标记列表
返回结果:如果1244句子包含单词"time"被标志为NOUN,则返回pair_counts[NOUN][time] == 1244
"""
#统计单词和标签对的个数,单词和标签之间用tab键隔开
word_tag_pair=defaultdict(int)
for i in range(len(sequences_A)):
for word,tag in zip(sequences_A[i],sequences_B[i]):
word_tag_pair[word+'\t'+tag] += 1
#将word_tag_pair拆分为字典的字典
pair_counts=defaultdict(dict)
for key in word_tag_pair:
word,tag=key.split('\t')
num=word_tag_pair[key]
pair_counts[tag][word]=num
return pair_counts
emission_counts = pair_counts(data.X,data.Y)
assert len(emission_counts) == 12, \
"Uh oh. There should be 12 tags in your dictionary."
assert max(emission_counts["NOUN"], key=emission_counts["NOUN"].get) == 'time', \
"Hmmm...'time' is expected to be the most common NOUN."
HTML('<div class="alert alert-block alert-success">Your emission counts look good!</div>')
from collections import namedtuple
FakeState = namedtuple("FakeState", "name")
class MFCTagger:
missing = FakeState(name="<MISSING>")
def __init__(self, table):
self.table = defaultdict(lambda: MFCTagger.missing)
self.table.update({word: FakeState(name=tag) for word, tag in table.items()})
def viterbi(self, seq):
"""This method simplifies predictions by matching the Pomegranate viterbi() interface"""
return 0., list(enumerate(["<start>"] + [self.table[w] for w in seq] + ["<end>"]))
def tagwordnum_to_mfc(tagwordnum):
"""
tagwordnum:pair_counts返回的结果,形如pair_counts[NOUN][time] == 1244的字典
返回结果:返回每个单词对应被标志的最频繁的标志的字典,如dict[time]=NOUN
"""
#将dict[NOUN][time]=1244转化为dict[time][NOUN]=1244的格式
word_tag_pairs=defaultdict(dict)
for tag,word_nums in tagwordnum.items():
for word,num in word_nums.items():
word_tag_pairs[word][tag]=num
#再次遍历返回每个word被标志对多的tag
result=dict()
for word,tag_nums in word_tag_pairs.items():
Max_num=0
for tag,num in tag_nums.items():
if num > Max_num:
Max_num=num
result[word]=tag
return result
word_counts = pair_counts(data.training_set.X,data.training_set.Y)
mfc_table = tagwordnum_to_mfc(word_counts)
# MFC模型
mfc_model = MFCTagger(mfc_table) # Create a Most Frequent Class tagger instance
assert len(mfc_table) == len(data.training_set.vocab), ""
assert all(k in data.training_set.vocab for k in mfc_table.keys()), ""
assert sum(int(k not in mfc_table) for k in data.testing_set.vocab) == 5521, ""
HTML('<div class="alert alert-block alert-success">Your MFC tagger has all the correct words!</div>')
def replace_unknown(sequence):
"""Return a copy of the input sequence where each unknown word is replaced
by the literal string value 'nan'. Pomegranate will ignore these values
during computation.
"""
return [w if w in data.training_set.vocab else 'nan' for w in sequence]
def simplify_decoding(X, model):
"""X should be a 1-D sequence of observations for the model to predict"""
_, state_path = model.viterbi(replace_unknown(X))
return [state[1].name for state in state_path[1:-1]]
def accuracy(X, Y, model):
"""Calculate the prediction accuracy by using the model to decode each sequence
in the input X and comparing the prediction with the true labels in Y.
The X should be an array whose first dimension is the number of sentences to test,
and each element of the array should be an iterable of the words in the sequence.
The arrays X and Y should have the exact same shape.
X = [("See", "Spot", "run"), ("Run", "Spot", "run", "fast"), ...]
Y = [(), (), ...]
"""
correct = total_predictions = 0
for observations, actual_tags in zip(X, Y):
# The model.viterbi call in simplify_decoding will return None if the HMM
# raises an error (for example, if a test sentence contains a word that
# is out of vocabulary for the training set). Any exception counts the
# full sentence as an error (which makes this a conservative estimate).
try:
most_likely_tags = simplify_decoding(observations, model)
correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
except:
pass
total_predictions += len(observations)
return correct / total_predictions
mfc_training_acc = accuracy(data.training_set.X, data.training_set.Y, mfc_model)
print("training accuracy mfc_model: {:.2f}%".format(100 * mfc_training_acc))
mfc_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, mfc_model)
print("testing accuracy mfc_model: {:.2f}%".format(100 * mfc_testing_acc))
assert mfc_training_acc >= 0.955, "Uh oh. Your MFC accuracy on the training set doesn't look right."
assert mfc_testing_acc >= 0.925, "Uh oh. Your MFC accuracy on the testing set doesn't look right."
HTML('<div class="alert alert-block alert-success">Your MFC tagger accuracy looks correct!</div>')
隐马尔可夫模型是关于时序的概率模型,描述由一个隐藏的马尔可夫链生成不可观测的状态随机序列,再由各个状态生成一个观测而产生观测随机序列的过程。隐藏的马尔可夫链随机生成的状态的序列,称为状态序列(state sequence);每个状态生成的一个观测,而由此产生的观测的随机序列,称为观测序列(observation sequence)。序列的每一个位置可以看作是一个时刻。
隐马尔可夫模型由初始状态概率分布向量( $\pi$),状态转移概率矩阵($A$)和状态产生的观测概率矩阵($B$)决定,$\pi$和$A$决定状态序列,$B$决定状态产生的观测序列,因此假设马尔可夫模型为$\lambda$,则: $$\lambda=(A,B,\pi)$$
假设
$Q$是所有可能隐藏状态的集合 ,其中N为状态个数,则 $$Q=\{q_1,q_2,...,q_N\}$$ $V$是所有可能观测的集合,其中M为可能观测数,则 $$V=\{v_1,v_2,...,v_M\}$$ $I$是长度为T的状态序列,$O$是对应的观测序列,则 $$I=\{i_1,i_2,...,i_T\},O=\{o_1,o_2,...,o_T\}$$
那么隐马尔可夫的参数$A$、$B$、$\pi$的定义如下: $$A=[a_{ij}]_{NxN}$$ 其中$a_{ij}=P(i_{t+1}=q_j|i_t=q_i)$,i=1,2,...,N;j=1,2,...,N,表示时刻t处于状态$q_i$的条件下在时刻t+1转移到$q_j$的概率 $$B=[b_j(k)]_{NxM}$$ 其中$b_j(k)=P(o_t=v_k|i_t=q_j)$,k=1,2,...,M;j=1,2,...,N,表示时刻t处于状态$q_j$的条件下生成观测$v_k$的概率 $$\pi=(\pi_i)$$ 其中$\pi_i=P(i_1=q_i)$,i=1,2,...,N,是时刻t=1处于状态$q_i$的概率
具体请参见斯坦福的材料
对于词性标注来说就是预测问题,根据观察到的单词序列,求解单词对应的标注,下面通过使用开源实现的马尔可夫模型算法Pomegranate求解词性标注的问题
计算每个词性发生的频率
def unigram_counts(sequences):
"""
sequences:词性标注的序列
返回值:每个词性发生的频率,例如unigram_counts[NOUN] == 275558
"""
result=defaultdict(int)
for sequence in sequences:
for tag in sequence:
result[tag] += 1
return result
tag_unigrams = unigram_counts(data.training_set.Y)
assert set(tag_unigrams.keys()) == data.training_set.tagset, \
"Uh oh. It looks like your tag counts doesn't include all the tags!"
assert min(tag_unigrams, key=tag_unigrams.get) == 'X', \
"Hmmm...'X' is expected to be the least common class"
assert max(tag_unigrams, key=tag_unigrams.get) == 'NOUN', \
"Hmmm...'NOUN' is expected to be the most common class"
HTML('<div class="alert alert-block alert-success">Your tag unigrams look good!</div>')
计算词性双元组发生的频率
def bigram_counts(sequences):
"""
sequences:词性标注的序列
返回值:返回两个词性同时发生的频率,例如bigram_counts[(NOUN, VERB)] == 61582
"""
result=defaultdict(int)
for sequence in sequences:
for i in range(1,len(sequence)):
result[(sequence[i-1],sequence[i])] += 1
return result
tag_bigrams = bigram_counts(data.training_set.Y)
assert len(tag_bigrams) == 144, \
"Uh oh. There should be 144 pairs of bigrams (12 tags x 12 tags)"
assert min(tag_bigrams, key=tag_bigrams.get) in [('X', 'NUM'), ('PRON', 'X')], \
"Hmmm...The least common bigram should be one of ('X', 'NUM') or ('PRON', 'X')."
assert max(tag_bigrams, key=tag_bigrams.get) in [('DET', 'NOUN')], \
"Hmmm...('DET', 'NOUN') is expected to be the most common bigram."
HTML('<div class="alert alert-block alert-success">Your tag bigrams look good!</div>')
计算每个词性在句子开头发生的频率
def starting_counts(sequences):
"""
sequences:词性标注的序列
返回值:返回每个句子开头词性的频率,例如 starting_counts[NOUN] == 8093
"""
# TODO: Finish this function!
result=defaultdict(int)
for sequence in sequences:
result[sequence[0]] += 1
return result
tag_starts = starting_counts(data.training_set.Y)
assert len(tag_starts) == 12, "Uh oh. There should be 12 tags in your dictionary."
assert min(tag_starts, key=tag_starts.get) == 'X', "Hmmm...'X' is expected to be the least common starting bigram."
assert max(tag_starts, key=tag_starts.get) == 'DET', "Hmmm...'DET' is expected to be the most common starting bigram."
HTML('<div class="alert alert-block alert-success">Your starting tag counts look good!</div>')
计算每个词性在句子的末尾发生的频率
def ending_counts(sequences):
"""
sequences:词性序列
返回值:返回每个句子的末尾词性发生的频率,例如ending_counts[DET] == 18
"""
result=defaultdict(int)
for seq in sequences:
result[seq[-1]] += 1
return result
tag_ends = ending_counts(data.training_set.Y)
assert len(tag_ends) == 12, "Uh oh. There should be 12 tags in your dictionary."
assert min(tag_ends, key=tag_ends.get) in ['X', 'CONJ'], "Hmmm...'X' or 'CONJ' should be the least common ending bigram."
assert max(tag_ends, key=tag_ends.get) == '.', "Hmmm...'.' is expected to be the most common ending bigram."
HTML('<div class="alert alert-block alert-success">Your ending tag counts look good!</div>')
下面构建马尔可夫模型
basic_model = HiddenMarkovModel(name="base-hmm-tagger")
a.构建马尔可夫模型的发射概率,也就是模型$\lambda=(A,B,\pi)$中的B的概率分布,发射概率是隐藏状态转移到可观查状态的概率
def calEmissionProbabilities(emission_counts,tag_unigrams):
"""
emission_counts:词性和单词对的频率
tag_unigrams:每个词性发生的频率
返回值:返回每个词性转化为对应单词的概率,例如dict[NOUM][time]=0.005,表示名词有0.005的概率转化为time
"""
result=defaultdict(dict)
for tag,num in tag_unigrams.items():
for word,wnum in emission_counts[tag].items():
result[tag][word]=float(wnum)/num
return result
#将每种隐藏状态的发射概率加入模型中,同时保存各个状态,以便后面引用
emission_Probabilities=calEmissionProbabilities(emission_counts,tag_unigrams)
states={}
for tag,distribution in emission_Probabilities.items():
s = State(DiscreteDistribution(distribution), name=tag)
states[tag]=s
basic_model.add_states(s)
b.构建马尔可夫模型的状态转移概率,也就是模型$\lambda=(A,B,\pi)$中的A的概率分布
for (tag1,tag2),num in tag_bigrams.items():
basic_model.add_transition(states[tag1],states[tag2],float(num)/tag_unigrams[tag1])
c.构建马尔可夫模型的初始状态概率,也就是模型$\lambda=(A,B,\pi)$中的$\pi$的概率分布
starts_count=sum(list(tag_starts.values()))
ends_count=sum(list(tag_ends.values()))
for tag,num in tag_starts.items():
basic_model.add_transition( basic_model.start, states[tag], float(num)/starts_count )
for tag,num in tag_ends.items():
basic_model.add_transition( states[tag], basic_model.end, float(num)/ends_count )
basic_model.bake()
assert all(tag in set(s.name for s in basic_model.states) for tag in data.training_set.tagset), \
"Every state in your network should use the name of the associated tag, which must be one of the training set tags."
assert basic_model.edge_count() == 168, \
("Your network should have an edge from the start node to each state, one edge between every " +
"pair of tags (states), and an edge from each state to the end node.")
HTML('<div class="alert alert-block alert-success">Your HMM network topology looks good!</div>')
d.模型的可视化
show_model(basic_model, figsize=(20, 20), filename="example.png", overwrite=True, show_ends=False)
hmm_training_acc = accuracy(data.training_set.X, data.training_set.Y, basic_model)
print("training accuracy basic hmm model: {:.2f}%".format(100 * hmm_training_acc))
hmm_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, basic_model)
print("testing accuracy basic hmm model: {:.2f}%".format(100 * hmm_testing_acc))
assert hmm_training_acc > 0.97, "Uh oh. Your HMM accuracy on the training set doesn't look right."
assert hmm_training_acc > 0.955, "Uh oh. Your HMM accuracy on the training set doesn't look right."
HTML('<div class="alert alert-block alert-success">Your HMM tagger accuracy looks correct! Congratulations, you\'ve finished the project.</div>')
for key in data.testing_set.keys[:2]:
print("Sentence Key: {}\n".format(key))
print("Predicted labels:\n-----------------")
print(simplify_decoding(data.sentences[key].words, basic_model))
print()
print("Actual labels:\n--------------")
print(data.sentences[key].tags)
print("\n")