词典分词之mmseg算法原理及其实现

1、简介

MMSeg是由台湾学者蔡志浩(Chih-Hao Tsai)于1996年提出的基于字符串匹配(亦称基于词典)的中文分词算法。词典分词应用广、可控、速度快,但也存在无法解决歧义问题,比如,“武汉市长江大桥”是应分词“武汉/市长/江大桥”还是“武汉市/长江/大桥”。基于此,有人提出了正向最大匹配策略,但是可能会出现分词错误的情况,比如:若词典中有“武汉市长”,则原句被分词成“武汉市长/江大桥”。单纯的最大匹配还是无法完美地解决歧义,因而MMSeg在正向最大匹配的基础上设计了四个启发式规则,解决了词典分词的歧义问题。

使用过MMSEG分词算法的人反馈都不错,觉得这个算法简单高效,而且还非常准确。作者声称这个规则达到了99.69%的准确率并且93.21%的歧义能被这个规则消除。

2、MMseg分词算法原理

2.1、基本含义

MMSeg的字符串匹配算法分为两种

  • Simple,即简单的正向匹配,根据开头的字,列出所有可能的结果,取最大匹配结果;
  • Complex,匹配出所有的“三个词的词组”(即原文中的chunk,“词组”),即从某一既定的字为起始位置,得到所有可能的“以三个词为一组”的所有组合,设计了四个去歧义规则(Ambiguity Resolution Rules)指导分词。

接下我们主要讲Complex的原理。
首先来理解一下chunk,它是MMSeg分词算法中一个关键的概念。Chunk中包含依据上下文分出的一组词和相关的属性,包括长度(Length)、平均长度(Average Length)、标准差的平方(Variance)和自由语素度(Degree Of Morphemic Freedom)。下表中列出了各个属性的含义:

属性 含义
长度(Length) chuck中各个词的长度之和
平均长度(Average Length) chuck中各个词的长度求平均
标准差的平方(Variance) chuck中各个词的长度求标准差
自由语素度(Degree Of Morphemic Freedom) 各单字词词频的对数之和

可能看到上面的一些概念会觉得有些模糊,下面我们通过一个简单的列子加深理解。
其实chunk就是三个词为一组,在chunk确定之后,chunk的各种属性也就随之可以计算出来,比如”长春市长春药店”,我们可以得到很多chunk

长春_市_长春
长_春_市长
长春_市长_春药
长春市_长春_药店
….

词典分解出来的可能更多(假定这些词都在词典中)。上面列举的三个词为一组的就是一个chunk,一旦chunk确定之后chunk的各种属性也就可以随之计算出来。

2.2、四个规则

消除歧义的规则有四个,使用中依次用这四个规则进行过滤,直到只有一种结果或者第四个规则使用完毕,4条消歧规则包括:

  • 规则1:取最大匹配的chunk (Rule 1: Maximum matching)
  • 规则2:取平均词长最大的chunk (Rule 2: Largest average word length)
  • 规则3:取词长标准差最小的chunk (Rule 3: Smallest variance of word lengths)
  • 规则4:取单字词自由语素度之和最大的chunk (Rule 4: Largest sum of degree of morphemic freedom of one-character words)

接来下举例进行分析:

  • 取最大匹配的chunk(Maximum matching (Chen & Liu 1992))
    有两种情况,分别对应于使用“simple”和“complex”的匹配方法。对“simple”匹配方法,选择长度最大的词,用在上文的例子中即选择“长春市”;
    对“complex”匹配方法,选择“词组长度最大的”那个词组,然后选择这个词组的第一个词,作为切分出的第一个词,所以在上文提到的例子中我们选择的chunk为
    长春市_长春_药店”,切分出的词为“长春市

    从以上的情况来看貌似simple和complex没啥区别,如果是对“北京大学城”进行分词的话,“simple”第一个词结果为“北京大学”,“complex”第一个词结果为”北京”,正确的分词结果为“北京 大学城”。

    对于有超过一个词组满足这个条件的则应用下一个规则

  • 取平均词长最大的chunk(Largest average word length (Chen & Liu, 1992))
    经过规则1过滤后,如果剩余的词组超过1个,那就选择平均词语长度最大的那个(平均词长=词组总字数/词语数量)。比如:“李志博士兵”,经过以上规则过滤后,可能还剩下如下的chunk:

    李志博_士兵(5/2=2.5)
    李志_博士_兵(5/3=1.67)

    依据此规则就可以确定“李志博_士兵”这个chunk,但是对于上述的“北京大学城”此条规则并未起作用。

  • 取词长标准差最小的chunk(Smallest variance of word lengths (Chen & Liu, 1992))
    由于词语长度的变化率可以由标准差反映,所以此处直接套用标准差公式即可。比如针对”研究生命起源“这个case,经过上面的规则过滤之后有:

    研究_生命_起源 (标准差=sqrt(((2-2)^2+(2-2)^2+(2-2^2))/3)=0)
    研究生_命_起源 (标准差=sqrt(((2-3)^2+(2-1)^2+(2-2)^2)/3)=0.8165)

    于是通过这一规则选择“研究_生命_起源”这个词组
  • 取单字词自由语素度之和最大的chunk(Largest sum of degree of morphemic freedom of one-character words)
    这里的自由语素度可以用一个数学公式表达:log(frequency),即词频的自然对数(这里log表示数学中的ln)。这个规则的意思是“计算词组中的所有单字词词频的自然对数,然后将得到的值相加,取总和最大的词组”。比如:
    “设施和服务”,经过上面的规则过滤之后,这个会有如下几种组合

    设施_和服_务
    设施_和_服务

    利用最后一个规则 第一条中的“务”和第二条中的“和”,从直观看,显然是“和”的词频在日常场景下要高,这依赖一个词频字典和的词频决定了的分词。
    假设“务”作为单字词时候的频率是30,“和”作为单字词时候的频率是100,对30和100取自然对数,然后取最大值者,所以取“和”字所在的词组,即“设施_和_服务”。

    也许会问为什么要对“词频”取自然对数呢?可以这样理解,词组中单字词词频总和可能一样,但是实际的效果并不同,比如
    A_BBB_C (单字词词频,A:3, C:7)
    DD_E_F (单字词词频,E:5,F:5)
    表示两个词组,A、C、E、F表示不同的单字词,如果不取自然对数,单纯就词频来计算,那么这两个词组是一样的(3+7=5+5),但实际上不同的词频范围所表示的效果也不同,所以这里取自然对数,以表区分(ln(3)+ln(7) < ln(5)+ln(5), 3.0445<3.2189)。

从上面的叙述来看MMseg算法是将一个句子尽可能长和尽可能均匀的切分,这与中文的语法习惯比较相符。同时,分词效果与词典的关系较大,尤其是词典中单字词的频率。“simple”只是用了上述的第一个规则,其实等同于向前最大匹配算法,“complex”以上的四个规则都是用了。实际使用中,一般都是使用complex的匹配方法+四个规则过滤。在使用MMseg词典分词算法的时候尽量使用领域词典对该领域进行分词,如:计算机类词库、医疗词库、金融词库、旅游类词库等,这样效果会好很多。
总的来说MMseg是一个简单、快速、有效的分词方法。

3、算法拓展

上文基本上将MMseg算法的原理讲解了一遍,但是其实上述的规则并未消除掉所有的歧义,比如:“购买文学艺术作品”按照上述的“complex”模式进行分词可能经过四个规则过滤后还会剩余一下2个chunk:

购买_文学_艺术作品
购买_文学艺术_作品

如果词典中有“文学艺术作品”,那么对于这个语句使用MMseg分词是没问题的。
那么如果词典中没有呢,至少我加载jieba分词的词典中是没有的,是使用该词典进行MMseg分词,经过四个规则过滤之后剩下以上两个chunk。那么如何对以上的两个chunk进行优选呢。受 规则4 的启发,我们是否在以上的规则基础之上加入 第五条规则
(5) 计算chunk自由语素度,取最大的那个chunk,即
$$max(\sum_{i=0}^{N} log(freq_{w_i})),N=2$$
所以,针对上述例子中,如果使用jieba分词词典,各个词在词典中的信息为:购买(4275)、文学(6890)、艺术作品(3)、文学艺术(3)、作品(9248)
则有:

$$\begin{split}
f(购买||文学||艺术作品){} & =log(freq_{购买})+log(freq_{文学})+log(freq_{艺术作品}) \\
{} & =log(4275)+log(6890)+log(3) \\
{} & =18.397
\end{split}$$

$$\begin{split}
f(购买||文学艺术||作品){} & =log(freq_{购买})+log(freq_{文学艺术})+log(freq_{作品}) \\
{} & =log(4275)+log(3)+log(9248) \\
{} & =18.691
\end{split}$$

则按照规则5,则最后取“购买_文学艺术_作品”这个chunk,就解决了经过4个规则过滤之后的还剩多个chunk的情况。
当然,以上加的这条规则为个人的一个思路,因为在实际中确确实实遇到了这个问题。其实加规则的思路还有很多,比如:

  • 直接计算chunk的trigram的概率,因为chunk刚好是三个词组合,缺点是计算量大。
  • 依据chunk的第一个词的log(freq),取该值的最大那个chunk,因为chunk过滤完成之后也只是决定了当前的分词结果为哪个词。
    其实以上思路的都是从算法规则上添加条件过滤,解决问题的方式还可以从词典入手,比如针对以上的例子加入“文学艺术作品”这个词汇。

拓展中所涉及的内容都是个人的一个理解和遇到问题的一个思路,将其列举了出来,并不为MMseg论文中的内容。如果有不妥之处希望大家批评指正。

4、算法demo实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
#!/usr/bin/python
# -*- coding:utf-8 -*-
from __future__ import unicode_literals
import json
import math
import os, sys


import re




class Mmseg(object):

def __init__(self):
self.MIN_FLOAT = -3.14e100
self.dic_file = "user_dic.json"
self.user_dic = json.load(open(self.dic_file, "r"))
self.maxlenth = 100 # 最大匹配长度
self.re_eng = re.compile('[a-zA-Z0-9]', re.U) # 英文数字的正则
self.re_han_default = re.compile("([\u4E00-\u9FD5a-zA-Z0-9+#&\._]+)", re.U) # 汉字的正则
self.re_skip_default = re.compile("(\r\n|\s)", re.U) # 符号的正则
self.re_han_cut_all = re.compile("([\u4E00-\u9FD5]+)", re.U)
self.re_skip_cut_all = re.compile("[^a-zA-Z0-9+#\n]", re.U)
self.show_chunks_flag=False

def get_DAG(self,sentence):
"""
构建有向无环图,使用词典中的词进行最大匹配
:param sentence:
:return:
"""
dag = {}
for index, word in enumerate(sentence):
for j in range(index, len(sentence)):
cur_word = u"".join(sentence[index:j + 1])
if cur_word in self.user_dic:
if index in dag:
dag[index].append(j)
else:
dag[index] = [j]
else:
#是否break决定着最大或者最小切分
# break
if (j - index) >= self.maxlenth: break
return dag


def maxLengthFilter(self,chunks):
"""
规则1:取最大长度匹配的chunk,即chunk中各词的长度之和取最大的chunk
:param chunks:
:return:
"""
chunks_dict = {}
for chunk in chunks:
words_length = sum([(end - begin + 1) for begin, end in chunk]) # 词的长度
if words_length in chunks_dict:
chunks_dict[words_length].append(chunk)
else:
chunks_dict[words_length] = [chunk]
chunks_dict_sorted = sorted(chunks_dict.items(), key=lambda x: x[0], reverse=True)
filted_chunks = chunks_dict_sorted[0][1] # 取最长的chunks
return filted_chunks


def averageMaxLengthFilter(self,chunks):
"""
规则2:取平均长度最大的,即chunk中(词的长度/chunk的切分个数)
:param chunks:
:return:
"""
chunks_dict = {}
for chunk in chunks:
chunk_size = float(len(chunk)) # chunk的切分个数
words_length = float(sum([(end - begin + 1) for begin, end in chunk]))
average_length = float(words_length / chunk_size)
if average_length in chunks_dict:
chunks_dict[average_length].append(chunk)
else:
chunks_dict[average_length] = [chunk]
chunks_dict_sorted = sorted(chunks_dict.items(), key=lambda x: x[0], reverse=True)
filted_chunks = chunks_dict_sorted[0][1] # 取最长的chunks
return filted_chunks


def standardDeviationFilter(self,chunks):
"""
规则3:取标准差平方最小的
标准差平方:[(x1-x)^2+(x2-x)^2 + ...+(xn-x)^2]/n ,其中x为平均值
:param chunks:
:return:
"""
chunks_dict = {}
for chunk in chunks:
chunk_size = float(len(chunk)) # chunk的切分个数
average_words_size = float(sum([(end - begin + 1) for begin, end in chunk]) / chunk_size)
chunk_sdq = float(0) # chunk的标准差平方
for (begin, end) in chunk:
cur_size = float(end - begin + 1)
chunk_sdq += float((cur_size - average_words_size) ** 2)
chunk_sdq = float(chunk_sdq / chunk_size)
if chunk_sdq in chunks_dict:
chunks_dict[chunk_sdq].append(chunk)
else:
chunks_dict[chunk_sdq] = [chunk]
chunks_dict_sorted = sorted(chunks_dict.items(), key=lambda x: x[0], reverse=False)
filted_chunks = chunks_dict_sorted[0][1] # 取最长的chunks
return filted_chunks


def logFreqFilterSingle(self,chunks,sentence):
"""
规则4:取单字自由度之和最大的chunk
自由度:log(frequency),即词频取自然对数
:param chunks:
:return:
"""
chunks_dict = {}
for chunk in chunks:
log_freq = 0
for (begin,end) in chunk:
if begin==end:
log_freq += math.log(float(self.user_dic.get(sentence[begin:end+1],1)))
if log_freq in chunks_dict:
chunks_dict[log_freq].append(chunk)
else:
chunks_dict[log_freq] = [chunk]
chunks_dict_sorted = sorted(chunks_dict.items(), key=lambda x: x[0], reverse=True)
filted_chunks = chunks_dict_sorted[0][1] # 取最长的chunks
return filted_chunks


def logFreqFilterMutiWord(self, chunks, sentence):
"""
规则5:取非单字的词语自由度之和最大的chunk
自由度:log(frequency),即词频取自然对数
:param chunks:
:return:
"""
chunks_dict = {}
for chunk in chunks:
log_freq = 0
for (begin, end) in chunk:
if begin != end:
log_freq += math.log(float(self.user_dic.get(sentence[begin:end + 1], 1)))
if log_freq in chunks_dict:
chunks_dict[log_freq].append(chunk)
else:
chunks_dict[log_freq] = [chunk]
chunks_dict_sorted = sorted(chunks_dict.items(), key=lambda x: x[0], reverse=True)
filted_chunks = chunks_dict_sorted[0][1] # 取最长的chunks
return filted_chunks


def chunksFilter(self,chunks,sentence):
"""
过滤规则
:param chunks:
:param sentence:
:return:
"""
if len(chunks) == 1: return chunks
# 1、取words长度之后最大的chunks
new_chunks = self.maxLengthFilter(chunks)
if len(new_chunks) == 1: return new_chunks

# 2、取平均word长度最大的chunks
new_chunks = self.averageMaxLengthFilter(new_chunks)
if len(new_chunks) == 1: return new_chunks

# 3、取平均方差最小的chunks
new_chunks = self.standardDeviationFilter(new_chunks)
if len(new_chunks) == 1: return new_chunks

# 4、取单字自由度组合最大的chunk
new_chunks = self.logFreqFilterSingle(new_chunks,sentence)
if len(new_chunks) == 1:return new_chunks

# 5、取非单字的所有词的组合自由语素之和最大的
new_chunks = self.logFreqFilterMutiWord(new_chunks, sentence)

if len(new_chunks) != 1:
print(sentence,"=====")
raise Exception("chunk划分最终的结果不唯一")
return new_chunks


def get_chunks(self,DAG, begin_index, text_length,sentence):
"""
给出所有的词和开始的位置,拿到所有的chunk
:param DAG:一句话的在字典中的最大匹配的位置
:param begin_index:从sentence中的某个起始位置开始构建chunk
:param text_length:文本的最大长度
:return:返回该位置生成的三个chunk组成chunks的所有组合
"""
chunks = []
chunk = []
if begin_index > text_length:
raise Exception("begin index out of sentcen length!!!")
for i in DAG.get(begin_index, [begin_index]): # 有的话就是词词典中的匹配,否则就是单字本身
if (i + 1) > text_length - 1: # 到了sentence的结尾,后面无可用字或词
chunks.append([(begin_index, i)])
break
for j in DAG.get(i + 1, [i + 1]):
if (j + 1) > text_length - 1: # 到了sentence的结尾,后面无可用字或词
chunks.append([(begin_index, i), (i + 1, j)])
break
for k in DAG.get(j + 1, [j + 1]):
chunk.append((begin_index, i))
chunk.append((i + 1, j))
chunk.append((j + 1, k))
chunks.append(chunk)
chunk = []
chunks = self.chunksFilter(chunks,sentence)
if self.show_chunks_flag:
self.show_chunks(sentence,chunks)
return chunks


def cut_DAG(self,sentence):
"""
对于划分后的句子,进行切分然后分词
:param sentence:
:return:
"""
self.sentence = sentence
text_length = len(sentence)
DAG = self.get_DAG(sentence)
index = 0
buff = ""
while True:
if index>text_length-1:break
chunks = self.get_chunks(DAG,index,text_length,sentence)
begin,end = chunks[0][0]
word = "".join(sentence[begin:end+1])
if self.re_eng.match(word) and len(word)==1: #对英文和数字进行处理
buff += word
else:
if buff:
yield buff
buff = ""

yield word
index = end+1
if buff: #末尾为数字或英文
yield buff
buff = ""

def cut_re(self,sentence,cut_all = False):
"""
文本的分词切分
:param sentence:输入进来的文本
:param cut_all: 全切分(暂不支持)
:return:返回切分的词
"""
if cut_all:
re_han = self.re_han_cut_all
re_skip = self.re_skip_cut_all
else:
re_han = self.re_han_default
re_skip = self.re_skip_default
blocks = self.re_han_default.split(sentence)
for blk in blocks: #按照符号划分然后在针对划分后的分词
if not blk:
continue
if re_han.match(blk):
for word in self.cut_DAG(blk):
yield word
else:
tmp = re_skip.split(blk)
for x in tmp:
if re_skip.match(x):
yield x
elif not cut_all:
for xx in x:
yield xx
else:
yield x

def cut(self,sentence):

sentence = self.strdecode(sentence)
for word in self.cut_DAG(sentence):
yield word

def show_chunks(self,sentence, chunks,word=None):
"""
打印使用,调试手段
:param sentence:
:param chunks:
:return:
"""

for chunk in chunks:
chunk_words = []
for (begin, end) in chunk:
word = "".join(sentence[begin:end + 1])
chunk_words.append(word)
print("\t".join(chunk_words))
if word!=None:print(word)
print("=======================")

def strdecode(self,sentence,text_type=str):
if not isinstance(sentence, text_type):
try:
sentence = sentence.decode('utf-8')
except UnicodeDecodeError:
sentence = sentence.decode('gbk', 'ignore')
return sentence



def test_one():
mmseg = Mmseg()
mmseg.show_chunks_flag = False
mmseg.user_dic[u"交易性金融资产"] = 1
mmseg.user_dic[u"融通基金a"] = 1
mmseg.user_dic[u"出了"] = 100
print(mmseg.user_dic[u"中出"])
query = "持股基金包含融通基金a,银行存款,交易性金融资产"
# print(json.dumps(re.split("[, , 。 ;]",query),ensure_ascii=False))
# print(json.dumps(re.findall("[, , 。 ;]", query), ensure_ascii=False))
print("/".join(list(mmseg.cut(query))))
query = "购买文学艺术作品"
print("/".join(list(mmseg.cut(query))))
query = "吉林省长春市长春药店,我来到了北京大学城,看到了北京大学生的技术和服务很厉害。工信处女干事每月经过楼下都要亲口交代,2015年5月机器需要维护。我们中出了一个汉奸。"
print("/".join(list(mmseg.cut(query))))
mmseg.user_dic[u"李志博"] = 100
mmseg.user_dic[u"李志"] = 100
query = "董事长包含李志博士"
print("/".join(list(mmseg.cut(query))))
mmseg.user_dic[u"市销率(ps)"] = 100
print("/".join(list(mmseg.cut("市销率(ps)"))))
print("/".join(list(mmseg.cut("吉林省长春市长春药店"))))
if __name__ == "__main__":
test_one()

上述代码中将jieba分词的词典中的word和freq转换为{word:freq}的json文件,则运行程序的结果为:

3
持股/基金/包含/融通基金a/,/银行存款/,/交易性金融资产
购买/文学艺术/作品
吉林省/长春市/长春/药店/,/我/来到/了/北京/大学城/,/看到/了/北京/大学生/的/技术/和/服务/很/厉害/。/工信处/女干事/每月/经过/楼下/都/要/亲口/交代/,/2015/年/5/月/机器/需要/维护/。/我们/中出/了/一个/汉奸/。
董事长/包含/李志/博士
市销率(ps)
吉林省/长春市/长春/药店

算法原理很简单,以上代码为python的一个简单实现。如果想做成java版本话,这个可以作为一个简单的参照!

5、总结

  • 优点:
    MMseg是一个简单快速有效的词典分词算法,无需训练语料,算法效果来说很不错。
    有些句子在使用jieba的词典的情况下比jieba的分词结果都要好。比如:“我来到了北京大学城”,使用jieba分词和mmseg分词的结果为:

    我/来到/了/北京/大学城(MMseg结果)
    我/来到/了/北京大学/城(jieba结果)

  • 缺点:
    有着词典分词的缺点就是对于未登录词无法解决。
    其实也存在也使用词典分词也无法解决的反例,比如:把手抬起来,正确的分词是把_手_抬起_来,但经过本算法的规则过滤为把手_抬起_来

对于未登录词可以使用模型(HMM、CRF、BI-LSTM)来解决。