词性标注(Part of speech tagging)的算法模型比较

语料来自Brown corpus,词性集来自universal tagset

数据集是简单的文本文件,句子和句子之间是用空格分开,每一句子的第一行是句子的ID,单词和标志用TAB字符分割, 语料的格式如下:

b100-38532
Perhaps ADV
it  PRON
was VERB
right   ADJ
;   .
;   .

b100-35577
...

标记集总共十二个标志,如下:

.
ADJ
ADP
ADV
CONJ
DET
NOUN
NUM
PRON
PRT
VERB
X

词性标注的示意图

简单来说就是给一个句子的单词标注对应的词性(名词、动词、形容词、副词等等)

In [3]:
from graphviz import  Digraph
dot=Digraph(name='pos',comment='词性标注',engine='dot')
dot.attr(rankdir='LR', size='8,5')
#标注序列
dot.node('q0','...')
dot.node('q1','NN')
dot.node('q2','VB')
dot.node('q3','VB')
dot.node('q4','NN')
dot.node('qt','...')
#单词序列
dot.attr('node', shape='dashed')
dot.node('v1','Will')
dot.node('v2','will')
dot.node('v3','see')
dot.node('v4','Sally')

#添加边
dot.edge('q0','q1')
dot.edge('q1','q2')
dot.edge('q2','q3')
dot.edge('q3','q4')
dot.edge('q4','qt')

dot.edge('q1','v1')
dot.edge('q2','v2')
dot.edge('q3','v3')
dot.edge('q4','v4')

dot
Out[3]:
pos q0 ... q1 NN q0->q1 q2 VB q1->q2 v1 Will q1->v1 q3 VB q2->q3 v2 will q2->v2 q4 NN q3->q4 v3 see q3->v3 qt ... q4->qt v4 Sally q4->v4

语料处理相关的工具函数

In [4]:
import os
import matplotlib.pyplot as plt
import matplotlib.image as mplimg
import networkx as nx
import random
import matplotlib.pyplot as plt
import numpy as np

from itertools import chain
from collections import Counter, defaultdict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution
from io import BytesIO
from itertools import chain
from collections import namedtuple, OrderedDict
from IPython.core.display import HTML


Sentence = namedtuple("Sentence", "words tags")

def read_data(filename):
    """Read tagged sentence data"""
    with open(filename, 'r') as f:
        sentence_lines = [l.split("\n") for l in f.read().split("\n\n")]
    return OrderedDict(((s[0], Sentence(*zip(*[l.strip().split("\t")
                        for l in s[1:]]))) for s in sentence_lines if s[0]))


def read_tags(filename):
    """Read a list of word tag classes"""
    with open(filename, 'r') as f:
        tags = f.read().split("\n")
    return frozenset(tags)


def model2png(model, filename="", overwrite=False, show_ends=False):
    """Convert a Pomegranate model into a PNG image

    The conversion pipeline extracts the underlying NetworkX graph object,
    converts it to a PyDot graph, then writes the PNG data to a bytes array,
    which can be saved as a file to disk or imported with matplotlib for display.

        Model -> NetworkX.Graph -> PyDot.Graph -> bytes -> PNG

    Parameters
    ----------
    model : Pomegranate.Model
        The model object to convert. The model must have an attribute .graph
        referencing a NetworkX.Graph instance.

    filename : string (optional)
        The PNG file will be saved to disk with this filename if one is provided.
        By default, the image file will NOT be created if a file with this name
        already exists unless overwrite=True.

    overwrite : bool (optional)
        overwrite=True allows the new PNG to overwrite the specified file if it
        already exists

    show_ends : bool (optional)
        show_ends=True will generate the PNG including the two end states from
        the Pomegranate model (which are not usually an explicit part of the graph)
    """
    nodes = model.graph.nodes()
    if not show_ends:
        nodes = [n for n in nodes if n not in (model.start, model.end)]
    g = nx.relabel_nodes(model.graph.subgraph(nodes), {n: n.name for n in model.graph.nodes()})
    pydot_graph = nx.drawing.nx_pydot.to_pydot(g)
    pydot_graph.set_rankdir("LR")
    png_data = pydot_graph.create_png(prog='dot')
    img_data = BytesIO()
    img_data.write(png_data)
    img_data.seek(0)
    if filename:
        if os.path.exists(filename) and not overwrite:
            raise IOError("File already exists. Use overwrite=True to replace existing files on disk.")
        with open(filename, 'wb') as f:
            f.write(img_data.read())
        img_data.seek(0)
    return mplimg.imread(img_data)


def show_model(model, figsize=(5, 5), **kwargs):
    """Display a Pomegranate model as an image using matplotlib

    Parameters
    ----------
    model : Pomegranate.Model
        The model object to convert. The model must have an attribute .graph
        referencing a NetworkX.Graph instance.

    figsize : tuple(int, int) (optional)
        A tuple specifying the dimensions of a matplotlib Figure that will
        display the converted graph

    **kwargs : dict
        The kwargs dict is passed to the model2png program, see that function
        for details
    """
    plt.figure(figsize=figsize)
    plt.imshow(model2png(model, **kwargs))
    plt.axis('off')


class Subset(namedtuple("BaseSet", "sentences keys vocab X tagset Y N stream")):
    def __new__(cls, sentences, keys):
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        wordset = frozenset(chain(*word_sequences))
        tagset = frozenset(chain(*tag_sequences))
        N = sum(1 for _ in chain(*(sentences[k].words for k in keys)))
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, {k: sentences[k] for k in keys}, keys, wordset, word_sequences,
                               tagset, tag_sequences, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())


class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream")):
    def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=112890):
        tagset = read_tags(tagfile)
        sentences = read_data(datafile)
        keys = tuple(sentences.keys())
        wordset = frozenset(chain(*[s.words for s in sentences.values()]))
        word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
        
        # split data into train/test sets
        _keys = list(keys)
        if seed is not None: random.seed(seed)
        random.shuffle(_keys)
        split = int(train_test_split * len(_keys))
        training_data = Subset(sentences, _keys[:split])
        testing_data = Subset(sentences, _keys[split:])
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, dict(sentences), keys, wordset, word_sequences, tagset,
                               tag_sequences, training_data, testing_data, N, stream.__iter__)

    def __len__(self):
        return len(self.sentences)

    def __iter__(self):
        return iter(self.sentences.items())
In [5]:
data = Dataset("tags-universal.txt", "brown-universal.txt", train_test_split=0.8)

print("There are {} sentences in the corpus.".format(len(data)))
print("There are {} sentences in the training set.".format(len(data.training_set)))
print("There are {} sentences in the testing set.".format(len(data.testing_set)))

assert len(data) == len(data.training_set) + len(data.testing_set), \
       "The number of sentences in the training set + testing set should sum to the number of sentences in the corpus"
There are 57340 sentences in the corpus.
There are 45872 sentences in the training set.
There are 11468 sentences in the testing set.
In [6]:
key = 'b100-38532'
print("Sentence: {}".format(key))
print("words:\n\t{!s}".format(data.sentences[key].words))
print("tags:\n\t{!s}".format(data.sentences[key].tags))
Sentence: b100-38532
words:
	('Perhaps', 'it', 'was', 'right', ';', ';')
tags:
	('ADV', 'PRON', 'VERB', 'ADJ', '.', '.')
In [7]:
print("There are a total of {} samples of {} unique words in the corpus."
      .format(data.N, len(data.vocab)))
print("There are {} samples of {} unique words in the training set."
      .format(data.training_set.N, len(data.training_set.vocab)))
print("There are {} samples of {} unique words in the testing set."
      .format(data.testing_set.N, len(data.testing_set.vocab)))
print("There are {} words in the test set that are missing in the training set."
      .format(len(data.testing_set.vocab - data.training_set.vocab)))

assert data.N == data.training_set.N + data.testing_set.N, \
       "The number of training + test samples should sum to the total number of samples"
There are a total of 1161192 samples of 56057 unique words in the corpus.
There are 928458 samples of 50536 unique words in the training set.
There are 232734 samples of 25112 unique words in the testing set.
There are 5521 words in the test set that are missing in the training set.
In [8]:
for i in range(2):    
    print("Sentence {}:".format(i + 1), data.X[i])
    print()
    print("Labels {}:".format(i + 1), data.Y[i])
    print()
Sentence 1: ('Mr.', 'Podger', 'had', 'thanked', 'him', 'gravely', ',', 'and', 'now', 'he', 'made', 'use', 'of', 'the', 'advice', '.')

Labels 1: ('NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.', 'CONJ', 'ADV', 'PRON', 'VERB', 'NOUN', 'ADP', 'DET', 'NOUN', '.')

Sentence 2: ('But', 'there', 'seemed', 'to', 'be', 'some', 'difference', 'of', 'opinion', 'as', 'to', 'how', 'far', 'the', 'board', 'should', 'go', ',', 'and', 'whose', 'advice', 'it', 'should', 'follow', '.')

Labels 2: ('CONJ', 'PRT', 'VERB', 'PRT', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'ADP', 'ADV', 'ADV', 'DET', 'NOUN', 'VERB', 'VERB', '.', 'CONJ', 'DET', 'NOUN', 'PRON', 'VERB', 'VERB', '.')

In [9]:
print("\nStream (word, tag) pairs:\n")
for i, pair in enumerate(data.stream()):
    print("\t", pair)
    if i > 5: break
Stream (word, tag) pairs:

	 ('Mr.', 'NOUN')
	 ('Podger', 'NOUN')
	 ('had', 'VERB')
	 ('thanked', 'VERB')
	 ('him', 'PRON')
	 ('gravely', 'ADV')
	 (',', '.')

1)最频繁类型模型(A Most Frequent Class tagger)

这也许是最简单的标注模型,每个词的标志是统计该词被标注最频繁的标志,这个模型可以当作其他模型比较的基线(baseline)

In [10]:
from collections import defaultdict
def pair_counts(sequences_A, sequences_B):
    """
    sequences_A:句子列表
    sequences_B:标记列表
    返回结果:如果1244句子包含单词"time"被标志为NOUN,则返回pair_counts[NOUN][time] == 1244
    
    """
    #统计单词和标签对的个数,单词和标签之间用tab键隔开
    word_tag_pair=defaultdict(int)
    for i in range(len(sequences_A)):
        for word,tag in zip(sequences_A[i],sequences_B[i]):
            word_tag_pair[word+'\t'+tag] += 1
    #将word_tag_pair拆分为字典的字典
    pair_counts=defaultdict(dict)
    for key in word_tag_pair:
        word,tag=key.split('\t')
        num=word_tag_pair[key]
        pair_counts[tag][word]=num
    return pair_counts



emission_counts = pair_counts(data.X,data.Y)

assert len(emission_counts) == 12, \
       "Uh oh. There should be 12 tags in your dictionary."
assert max(emission_counts["NOUN"], key=emission_counts["NOUN"].get) == 'time', \
       "Hmmm...'time' is expected to be the most common NOUN."
HTML('<div class="alert alert-block alert-success">Your emission counts look good!</div>')
Out[10]:
Your emission counts look good!
In [11]:
from collections import namedtuple

FakeState = namedtuple("FakeState", "name")

class MFCTagger:

    missing = FakeState(name="<MISSING>")
    
    def __init__(self, table):
        self.table = defaultdict(lambda: MFCTagger.missing)
        self.table.update({word: FakeState(name=tag) for word, tag in table.items()})
        
    def viterbi(self, seq):
        """This method simplifies predictions by matching the Pomegranate viterbi() interface"""
        return 0., list(enumerate(["<start>"] + [self.table[w] for w in seq] + ["<end>"]))


def tagwordnum_to_mfc(tagwordnum):
    """
    tagwordnum:pair_counts返回的结果,形如pair_counts[NOUN][time] == 1244的字典
    返回结果:返回每个单词对应被标志的最频繁的标志的字典,如dict[time]=NOUN
    """
    #将dict[NOUN][time]=1244转化为dict[time][NOUN]=1244的格式
    word_tag_pairs=defaultdict(dict)
    for tag,word_nums in tagwordnum.items():
        for word,num in word_nums.items():
            word_tag_pairs[word][tag]=num
    #再次遍历返回每个word被标志对多的tag
    result=dict()
    for word,tag_nums in word_tag_pairs.items():
        Max_num=0
        for tag,num in tag_nums.items():
            if num > Max_num:
                Max_num=num
                result[word]=tag
    return result
                     
word_counts = pair_counts(data.training_set.X,data.training_set.Y)
mfc_table = tagwordnum_to_mfc(word_counts)


# MFC模型
mfc_model = MFCTagger(mfc_table) # Create a Most Frequent Class tagger instance

assert len(mfc_table) == len(data.training_set.vocab), ""
assert all(k in data.training_set.vocab for k in mfc_table.keys()), ""
assert sum(int(k not in mfc_table) for k in data.testing_set.vocab) == 5521, ""
HTML('<div class="alert alert-block alert-success">Your MFC tagger has all the correct words!</div>')
Out[11]:
Your MFC tagger has all the correct words!
In [14]:
def replace_unknown(sequence):
    """Return a copy of the input sequence where each unknown word is replaced
    by the literal string value 'nan'. Pomegranate will ignore these values
    during computation.
    """
    return [w if w in data.training_set.vocab else 'nan' for w in sequence]

def simplify_decoding(X, model):
    """X should be a 1-D sequence of observations for the model to predict"""
    _, state_path = model.viterbi(replace_unknown(X))
    return [state[1].name for state in state_path[1:-1]]

def accuracy(X, Y, model):
    """Calculate the prediction accuracy by using the model to decode each sequence
    in the input X and comparing the prediction with the true labels in Y.
    
    The X should be an array whose first dimension is the number of sentences to test,
    and each element of the array should be an iterable of the words in the sequence.
    The arrays X and Y should have the exact same shape.
    
    X = [("See", "Spot", "run"), ("Run", "Spot", "run", "fast"), ...]
    Y = [(), (), ...]
    """
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        
        # The model.viterbi call in simplify_decoding will return None if the HMM
        # raises an error (for example, if a test sentence contains a word that
        # is out of vocabulary for the training set). Any exception counts the
        # full sentence as an error (which makes this a conservative estimate).
        try:
            most_likely_tags = simplify_decoding(observations, model)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

MFC模型的预测结果,训练集达到了95.72%的准确率,测试集达到了93%的准确率

In [15]:
mfc_training_acc = accuracy(data.training_set.X, data.training_set.Y, mfc_model)
print("training accuracy mfc_model: {:.2f}%".format(100 * mfc_training_acc))

mfc_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, mfc_model)
print("testing accuracy mfc_model: {:.2f}%".format(100 * mfc_testing_acc))

assert mfc_training_acc >= 0.955, "Uh oh. Your MFC accuracy on the training set doesn't look right."
assert mfc_testing_acc >= 0.925, "Uh oh. Your MFC accuracy on the testing set doesn't look right."
HTML('<div class="alert alert-block alert-success">Your MFC tagger accuracy looks correct!</div>')
training accuracy mfc_model: 95.72%
testing accuracy mfc_model: 93.01%
Out[15]:
Your MFC tagger accuracy looks correct!

2)隐马尔可夫模型(Hidden Markov Model,HMM)

HMM模型定义

隐马尔可夫模型是关于时序的概率模型,描述由一个隐藏的马尔可夫链生成不可观测的状态随机序列,再由各个状态生成一个观测而产生观测随机序列的过程。隐藏的马尔可夫链随机生成的状态的序列,称为状态序列(state sequence);每个状态生成的一个观测,而由此产生的观测的随机序列,称为观测序列(observation sequence)。序列的每一个位置可以看作是一个时刻。

隐马尔可夫模型由初始状态概率分布向量( $\pi$),状态转移概率矩阵($A$)和状态产生的观测概率矩阵($B$)决定,$\pi$和$A$决定状态序列,$B$决定状态产生的观测序列,因此假设马尔可夫模型为$\lambda$,则: $$\lambda=(A,B,\pi)$$

假设

$Q$是所有可能隐藏状态的集合 ,其中N为状态个数,则 $$Q=\{q_1,q_2,...,q_N\}$$ $V$是所有可能观测的集合,其中M为可能观测数,则 $$V=\{v_1,v_2,...,v_M\}$$ $I$是长度为T的状态序列,$O$是对应的观测序列,则 $$I=\{i_1,i_2,...,i_T\},O=\{o_1,o_2,...,o_T\}$$

那么隐马尔可夫的参数$A$、$B$、$\pi$的定义如下: $$A=[a_{ij}]_{NxN}$$ 其中$a_{ij}=P(i_{t+1}=q_j|i_t=q_i)$,i=1,2,...,N;j=1,2,...,N,表示时刻t处于状态$q_i$的条件下在时刻t+1转移到$q_j$的概率 $$B=[b_j(k)]_{NxM}$$ 其中$b_j(k)=P(o_t=v_k|i_t=q_j)$,k=1,2,...,M;j=1,2,...,N,表示时刻t处于状态$q_j$的条件下生成观测$v_k$的概率 $$\pi=(\pi_i)$$ 其中$\pi_i=P(i_1=q_i)$,i=1,2,...,N,是时刻t=1处于状态$q_i$的概率

隐马尔可夫模型的3个基本问题

  • (1)概率计算问题:给定模型 $\lambda=(A,B,\pi)$ 和观测序列$O=(o_1,o_2,...,o_t)$,计算在模型$\lambda$下观测序列O出现的概率$p(O|\lambda)$,求解算法有前向算法和后向算法
  • (2)学习问题:已知观测序列$O=(o_1,o_2,...,o_t)$,估计模型$\lambda=(A,B,\pi)$的参数,使得在该模型下观测序列$P(O|\lambda)$最大,即用极大似然估计的方法估计参数,求解算法有监督学习算法和EM算法
  • (3)预测问题:也称为解码(decoding)问题,已知模型$\lambda=(A,B,\pi)$和观测序列$O=(o_1,o_2,...,o_t)$,求对给定观测序列条件概率$P(I|O)$最大的状态序列$I=(i_1,i_2,...,i_t)$,求解算法有维特比算法

具体请参见斯坦福的材料

对于词性标注来说就是预测问题,根据观察到的单词序列,求解单词对应的标注,下面通过使用开源实现的马尔可夫模型算法Pomegranate求解词性标注的问题

计算每个词性发生的频率

In [16]:
def unigram_counts(sequences):
    """
    sequences:词性标注的序列
    返回值:每个词性发生的频率,例如unigram_counts[NOUN] == 275558
    """
    result=defaultdict(int)
    for sequence in sequences:
        for tag in sequence:
            result[tag] += 1

    return result

tag_unigrams = unigram_counts(data.training_set.Y)

assert set(tag_unigrams.keys()) == data.training_set.tagset, \
       "Uh oh. It looks like your tag counts doesn't include all the tags!"
assert min(tag_unigrams, key=tag_unigrams.get) == 'X', \
       "Hmmm...'X' is expected to be the least common class"
assert max(tag_unigrams, key=tag_unigrams.get) == 'NOUN', \
       "Hmmm...'NOUN' is expected to be the most common class"
HTML('<div class="alert alert-block alert-success">Your tag unigrams look good!</div>')
Out[16]:
Your tag unigrams look good!

计算词性双元组发生的频率

In [20]:
def bigram_counts(sequences):
    """
    sequences:词性标注的序列
    返回值:返回两个词性同时发生的频率,例如bigram_counts[(NOUN, VERB)] == 61582
    """

    result=defaultdict(int)
    for sequence in sequences:
        for i in range(1,len(sequence)):
            result[(sequence[i-1],sequence[i])] += 1
        
    return result

tag_bigrams = bigram_counts(data.training_set.Y)

assert len(tag_bigrams) == 144, \
       "Uh oh. There should be 144 pairs of bigrams (12 tags x 12 tags)"
assert min(tag_bigrams, key=tag_bigrams.get) in [('X', 'NUM'), ('PRON', 'X')], \
       "Hmmm...The least common bigram should be one of ('X', 'NUM') or ('PRON', 'X')."
assert max(tag_bigrams, key=tag_bigrams.get) in [('DET', 'NOUN')], \
       "Hmmm...('DET', 'NOUN') is expected to be the most common bigram."
HTML('<div class="alert alert-block alert-success">Your tag bigrams look good!</div>')
Out[20]:
Your tag bigrams look good!

计算每个词性在句子开头发生的频率

In [21]:
def starting_counts(sequences):
    """
    sequences:词性标注的序列
    返回值:返回每个句子开头词性的频率,例如 starting_counts[NOUN] == 8093
    """
    # TODO: Finish this function!
    result=defaultdict(int)
    for sequence in sequences:
        result[sequence[0]] += 1
        
    return result

tag_starts = starting_counts(data.training_set.Y)

assert len(tag_starts) == 12, "Uh oh. There should be 12 tags in your dictionary."
assert min(tag_starts, key=tag_starts.get) == 'X', "Hmmm...'X' is expected to be the least common starting bigram."
assert max(tag_starts, key=tag_starts.get) == 'DET', "Hmmm...'DET' is expected to be the most common starting bigram."
HTML('<div class="alert alert-block alert-success">Your starting tag counts look good!</div>')
Out[21]:
Your starting tag counts look good!

计算每个词性在句子的末尾发生的频率

In [22]:
def ending_counts(sequences):
    """
    sequences:词性序列
    返回值:返回每个句子的末尾词性发生的频率,例如ending_counts[DET] == 18
    """
    result=defaultdict(int)
    for seq in sequences:
        result[seq[-1]] += 1
    return result

tag_ends = ending_counts(data.training_set.Y)

assert len(tag_ends) == 12, "Uh oh. There should be 12 tags in your dictionary."
assert min(tag_ends, key=tag_ends.get) in ['X', 'CONJ'], "Hmmm...'X' or 'CONJ' should be the least common ending bigram."
assert max(tag_ends, key=tag_ends.get) == '.', "Hmmm...'.' is expected to be the most common ending bigram."
HTML('<div class="alert alert-block alert-success">Your ending tag counts look good!</div>')
Out[22]:
Your ending tag counts look good!

下面构建马尔可夫模型

In [23]:
basic_model = HiddenMarkovModel(name="base-hmm-tagger")

a.构建马尔可夫模型的发射概率,也就是模型$\lambda=(A,B,\pi)$中的B的概率分布,发射概率是隐藏状态转移到可观查状态的概率

In [24]:
def calEmissionProbabilities(emission_counts,tag_unigrams):
    """
    emission_counts:词性和单词对的频率
    tag_unigrams:每个词性发生的频率
    返回值:返回每个词性转化为对应单词的概率,例如dict[NOUM][time]=0.005,表示名词有0.005的概率转化为time
    """
    result=defaultdict(dict)
    for tag,num in tag_unigrams.items():
        for word,wnum in emission_counts[tag].items():
            result[tag][word]=float(wnum)/num
    return result

#将每种隐藏状态的发射概率加入模型中,同时保存各个状态,以便后面引用
emission_Probabilities=calEmissionProbabilities(emission_counts,tag_unigrams)
states={}
for tag,distribution in emission_Probabilities.items():
    s = State(DiscreteDistribution(distribution), name=tag)
    states[tag]=s
    basic_model.add_states(s)

b.构建马尔可夫模型的状态转移概率,也就是模型$\lambda=(A,B,\pi)$中的A的概率分布

In [25]:
for (tag1,tag2),num in tag_bigrams.items():
    basic_model.add_transition(states[tag1],states[tag2],float(num)/tag_unigrams[tag1])

c.构建马尔可夫模型的初始状态概率,也就是模型$\lambda=(A,B,\pi)$中的$\pi$的概率分布

In [26]:
starts_count=sum(list(tag_starts.values()))
ends_count=sum(list(tag_ends.values()))
for tag,num in tag_starts.items():
    basic_model.add_transition( basic_model.start, states[tag], float(num)/starts_count )
    
for tag,num in tag_ends.items():
    basic_model.add_transition( states[tag], basic_model.end, float(num)/ends_count )
    
basic_model.bake()

assert all(tag in set(s.name for s in basic_model.states) for tag in data.training_set.tagset), \
       "Every state in your network should use the name of the associated tag, which must be one of the training set tags."
assert basic_model.edge_count() == 168, \
       ("Your network should have an edge from the start node to each state, one edge between every " +
        "pair of tags (states), and an edge from each state to the end node.")
HTML('<div class="alert alert-block alert-success">Your HMM network topology looks good!</div>')
Out[26]:
Your HMM network topology looks good!

d.模型的可视化

In [31]:
show_model(basic_model, figsize=(20, 20), filename="example.png", overwrite=True, show_ends=False)

e.使用模型预测隐藏状态的序列,训练集达到了97.54%的准确率,测试集达到了96.18%,测试集的准确率比MFC模型的93%提高了3.18个点

In [32]:
hmm_training_acc = accuracy(data.training_set.X, data.training_set.Y, basic_model)
print("training accuracy basic hmm model: {:.2f}%".format(100 * hmm_training_acc))

hmm_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, basic_model)
print("testing accuracy basic hmm model: {:.2f}%".format(100 * hmm_testing_acc))

assert hmm_training_acc > 0.97, "Uh oh. Your HMM accuracy on the training set doesn't look right."
assert hmm_training_acc > 0.955, "Uh oh. Your HMM accuracy on the training set doesn't look right."
HTML('<div class="alert alert-block alert-success">Your HMM tagger accuracy looks correct! Congratulations, you\'ve finished the project.</div>')
training accuracy basic hmm model: 97.54%
testing accuracy basic hmm model: 96.18%
Out[32]:
Your HMM tagger accuracy looks correct! Congratulations, you've finished the project.
In [34]:
for key in data.testing_set.keys[:2]:
    print("Sentence Key: {}\n".format(key))
    print("Predicted labels:\n-----------------")
    print(simplify_decoding(data.sentences[key].words, basic_model))
    print()
    print("Actual labels:\n--------------")
    print(data.sentences[key].tags)
    print("\n")
Sentence Key: b100-28144

Predicted labels:
-----------------
['CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.']

Actual labels:
--------------
('CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.')


Sentence Key: b100-23146

Predicted labels:
-----------------
['PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.']

Actual labels:
--------------
('PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.')


参考文献

李航的《统计机器学习》

Udacity的人工智能培训课程