问题描述
以下是基于Transition的解析器的主要代码:
from __future__ import print_function
from __future__ import division
import os
import sys
curdir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(curdir)
if sys.version_info[0] < 3:
reload(sys)
sys.setdefaultencoding("utf-8")
# raise "Must be using Python 3"
from os import path
from collections import defaultdict
from tqdm import tqdm
import io
import random
import time
import pickle
SHIFT = 0; RIGHT = 1; LEFT = 2;
MOVES = (SHIFT,RIGHT,LEFT)
START = ['-START-','-START2-']
END = ['-END-','-END2-']
A list that returns a default value if index out of bounds.
class DefaultList(list):
def __init__(self,default=None):
self.default = default
list.__init__(self)
def __getitem__(self,index):
try:
return list.__getitem__(self,index)
except IndexError:
return self.default
class Parse(object):
def __init__(self,n):
self.n = n
self.heads = [None] * (n-1)
self.labels = [None] * (n-1)
self.lefts = []
self.rights = []
for i in range(n+1):
self.lefts.append(DefaultList(0))
self.rights.append(DefaultList(0))
def add(self,head,child,label=None):
print("Parse add:",label)
self.heads[child] = head
self.labels[child] = label
if child < head:
self.lefts[head].append(child)
else:
self.rights[head].append(child)
class Parser(object):
def __init__(self,load=True):
model_dir = os.path.dirname(__file__)
self.model = Perceptron(MOVES)
if load:
self.model.load(path.join(model_dir,'parser.pickle'))
self.tagger = PerceptronTagger(load=load)
self.confusion_matrix = defaultdict(lambda: defaultdict(int))
def save(self):
self.model.save(path.join(os.path.dirname(__file__),'parser.pickle'))
self.tagger.save()
def parse(self,words):
n = len(words)
i = 2; stack = [1]; parse = Parse(n)
tags = self.tagger.tag(words)
while stack or (i+1) < n:
features = extract_features(words,tags,i,n,stack,parse)
scores = self.model.score(features)
valid_moves = get_valid_moves(i,len(stack))
guess = max(valid_moves,key=lambda move: scores[move])
i = transition(guess,parse)
return tags,parse.heads
def train_one(self,itn,words,gold_tags,gold_heads):
n = len(words)
i = 2; stack = [1]; parse = Parse(n)
tags = self.tagger.tag(words)
while stack or (i + 1) < n:
features = extract_features(words,len(stack))
gold_moves = get_gold_moves(i,parse.heads,gold_heads)
guess = max(valid_moves,key=lambda move: scores[move])
assert gold_moves
best = max(gold_moves,key=lambda move: scores[move])
self.model.update(best,guess,features)
i = transition(guess,parse)
self.confusion_matrix[best][guess] += 1
return len([i for i in range(n-1) if parse.heads[i] == gold_heads[i]])
def transition(move,parse):
if move == SHIFT:
stack.append(i)
return i + 1
elif move == RIGHT:
parse.add(stack[-2],stack.pop())
return i
elif move == LEFT:
parse.add(i,stack.pop())
return i
assert move in MOVES
def get_valid_moves(i,stack_depth):
moves = []
if (i+1) < n:
moves.append(SHIFT)
if stack_depth >= 2:
moves.append(RIGHT)
if stack_depth >= 1:
moves.append(LEFT)
return moves
def get_gold_moves(n0,heads,gold):
def deps_between(target,others,gold):
for word in others:
if gold[word] == target or gold[target] == word:
return True
return False
valid = get_valid_moves(n0,len(stack))
if not stack or (SHIFT in valid and gold[n0] == stack[-1]):
return [SHIFT]
if gold[stack[-1]] == n0:
return [LEFT]
costly = set([m for m in MOVES if m not in valid])
# If the word behind s0 is its gold head,Left is incorrect
if len(stack) >= 2 and gold[stack[-1]] == stack[-2]:
costly.add(LEFT)
# If there are any dependencies between n0 and the stack,# pushing n0 will lose them.
if SHIFT not in costly and deps_between(n0,gold):
costly.add(SHIFT)
# If there are any dependencies between s0 and the buffer,popping
# s0 will lose them.
if deps_between(stack[-1],range(n0+1,n-1),gold):
costly.add(LEFT)
costly.add(RIGHT)
return [m for m in MOVES if m not in costly]
def extract_features(words,n0,parse):
def get_stack_context(depth,data):
if depth >= 3:
return data[stack[-1]],data[stack[-2]],data[stack[-3]]
elif depth >= 2:
return data[stack[-1]],''
elif depth == 1:
return data[stack[-1]],'',''
else:
return '',''
def get_buffer_context(i,data):
print("get_buffer_context",data)
if i == n:
return '',''
elif i + 1 >= n:
return data[i],''
elif i + 2 >= n:
return data[i],data[i + 1],''
else:
return data[i],data[i + 2]
def get_parse_context(word,deps,data):
if word == -1:
return 0,''
deps = deps[word]
valency = len(deps)
if not valency:
return 0,''
elif valency == 1:
return 1,data[deps[-1]],''
else:
return valency,data[deps[-2]]
features = {}
depth = len(stack)
s0 = stack[-1] if depth else -1
Ws0,Ws1,Ws2 = get_stack_context(depth,words)
Ts0,Ts1,Ts2 = get_stack_context(depth,tags)
Wn0,Wn1,Wn2 = get_buffer_context(n0,words)
Tn0,Tn1,Tn2 = get_buffer_context(n0,tags)
Vn0b,Wn0b1,Wn0b2 = get_parse_context(n0,parse.lefts,words)
Vn0b,Tn0b1,Tn0b2 = get_parse_context(n0,tags)
Vn0f,Wn0f1,Wn0f2 = get_parse_context(n0,parse.rights,words)
_,Tn0f1,Tn0f2 = get_parse_context(n0,tags)
Vs0b,Ws0b1,Ws0b2 = get_parse_context(s0,Ts0b1,Ts0b2 = get_parse_context(s0,tags)
Vs0f,Ws0f1,Ws0f2 = get_parse_context(s0,Ts0f1,Ts0f2 = get_parse_context(s0,tags)
# Cap numeric features at 5?
# String-distance
Ds0n0 = min((n0 - s0,5)) if s0 != 0 else 0
features['bias'] = 1
# Add word and tag unigrams
for w in (Wn0,Wn2,Ws0,Ws2,Wn0b2,Ws0b2,Ws0f2):
if w:
features['w=%s' % w] = 1
for t in (Tn0,Tn2,Ts0,Ts2,Tn0b2,Ts0b2,Ts0f2):
if t:
features['t=%s' % t] = 1
# Add word/tag pairs
for i,(w,t) in enumerate(((Wn0,Tn0),(Wn1,Tn1),(Wn2,Tn2),(Ws0,Ts0))):
if w or t:
features['%d w=%s,t=%s' % (i,w,t)] = 1
# Add some bigrams
features['s0w=%s,n0w=%s' % (Ws0,Wn0)] = 1
features['wn0tn0-ws0 %s/%s %s' % (Wn0,Tn0,Ws0)] = 1
features['wn0tn0-ts0 %s/%s %s' % (Wn0,Ts0)] = 1
features['ws0ts0-wn0 %s/%s %s' % (Ws0,Wn0)] = 1
features['ws0-ts0 tn0 %s/%s %s' % (Ws0,Tn0)] = 1
features['wt-wt %s/%s %s/%s' % (Ws0,Wn0,Tn0)] = 1
features['tt s0=%s n0=%s' % (Ts0,Tn0)] = 1
features['tt n0=%s n1=%s' % (Tn0,Tn1)] = 1
# Add some tag trigrams
trigrams = ((Tn0,(Ts0,Tn0b1),Ts0b2),Ts0f2),(Tn0,Tn0b2),Ts1))
for i,(t1,t2,t3) in enumerate(trigrams):
if t1 or t2 or t3:
features['ttt-%d %s %s %s' % (i,t1,t3)] = 1
# Add some valency and distance features
vw = ((Ws0,Vs0f),Vs0b),(Wn0,Vn0b))
vt = ((Ts0,Vn0b))
d = ((Ws0,Ds0n0),('t' + Tn0+Ts0,('w' + Wn0+Ws0,Ds0n0))
for i,(w_t,v_d) in enumerate(vw + vt + d):
if w_t or v_d:
features['val/d-%d %s %d' % (i,w_t,v_d)] = 1
return features
class Perceptron(object):
def __init__(self,classes=None):
# Each feature gets its own weight vector,so weights is a dict-of-arrays
self.classes = classes
self.weights = {}
# The accumulated values,for the averaging. These will be keyed by
# feature/clas tuples
self._totals = defaultdict(int)
# The last time the feature was changed,for the averaging. Also
# keyed by feature/clas tuples
# (tstamps is short for timestamps)
self._tstamps = defaultdict(int)
# Number of instances seen
self.i = 0
def predict(self,features):
'''Dot-product the features and current weights and return the best class.'''
scores = self.score(features)
# Do a secondary alphabetic sort,for stability
return max(self.classes,key=lambda clas: (scores[clas],clas))
def score(self,features):
all_weights = self.weights
scores = dict((clas,0) for clas in self.classes)
for feat,value in features.items():
if value == 0:
continue
if feat not in all_weights:
continue
weights = all_weights[feat]
for clas,weight in weights.items():
scores[clas] += value * weight
return scores
def update(self,truth,features):
def upd_feat(c,f,v):
param = (f,c)
self._totals[param] += (self.i - self._tstamps[param]) * w
self._tstamps[param] = self.i
self.weights[f][c] = w + v
self.i += 1
if truth == guess:
return None
for f in features:
weights = self.weights.setdefault(f,{})
upd_feat(truth,weights.get(truth,0.0),1.0)
upd_feat(guess,weights.get(guess,-1.0)
def average_weights(self):
for feat,weights in self.weights.items():
new_feat_weights = {}
for clas,weight in weights.items():
param = (feat,clas)
total = self._totals[param]
total += (self.i - self._tstamps[param]) * weight
averaged = round(total / float(self.i),3)
if averaged:
new_feat_weights[clas] = averaged
self.weights[feat] = new_feat_weights
def save(self,path):
print("Saving model to %s" % path)
pickle.dump(self.weights,open(path,'w'))
def load(self,path):
self.weights = pickle.load(open(path))
class PerceptronTagger(object):
'''Greedy Averaged Perceptron tagger'''
model_loc = os.path.join(os.path.dirname(__file__),'tagger.pickle')
def __init__(self,classes=None,load=True):
self.tagdict = {}
if classes:
self.classes = classes
else:
self.classes = set()
self.model = Perceptron(self.classes)
if load:
self.load(PerceptronTagger.model_loc)
def tag(self,tokenize=True):
prev,prev2 = START
tags = DefaultList('')
context = START + [self._normalize(w) for w in words] + END
for i,word in enumerate(words):
tag = self.tagdict.get(word)
if not tag:
features = self._get_features(i,word,context,prev,prev2)
tag = self.model.predict(features)
tags.append(tag)
prev2 = prev; prev = tag
return tags
def start_training(self,sentences):
self._make_tagdict(sentences)
self.model = Perceptron(self.classes)
def train(self,sentences,save_loc=None,nr_iter=5):
'''Train a model from sentences,and save it at save_loc. nr_iter
controls the number of Perceptron training iterations.'''
self.start_training(sentences)
for iter_ in range(nr_iter):
for words,tags in sentences:
self.train_one(words,tags)
random.shuffle(sentences)
self.end_training(save_loc)
def save(self):
# Pickle as a binary file
pickle.dump((self.model.weights,self.tagdict,self.classes),open(PerceptronTagger.model_loc,'wb'),-1)
def train_one(self,tags):
prev,prev2 = START
context = START + [self._normalize(w) for w in words] + END
for i,word in enumerate(words):
guess = self.tagdict.get(word)
if not guess:
feats = self._get_features(i,prev2)
guess = self.model.predict(feats)
self.model.update(tags[i],feats)
prev2 = prev; prev = guess
def load(self,loc):
w_td_c = pickle.load(open(loc,'rb'))
self.model.weights,self.classes = w_td_c
self.model.classes = self.classes
def _normalize(self,word):
if '-' in word and word[0] != '-':
return '!HYPHEN'
elif word.isdigit() and len(word) == 4:
return '!YEAR'
elif word[0].isdigit():
return '!DIGITS'
else:
return word.lower()
def _get_features(self,prev2):
'''Map tokens into a feature representation,implemented as a
{hashable: float} dict. If the features change,a new model must be
trained.'''
def add(name,*args):
features[' '.join((name,) + tuple(args))] += 1
i += len(START)
features = defaultdict(int)
# It's useful to have a constant feature,which acts sort of like a prior
add('bias')
add('i suffix',word[-3:])
add('i pref1',word[0])
add('i-1 tag',prev)
add('i-2 tag',prev2)
add('i tag+i-2 tag',prev2)
add('i word',context[i])
add('i-1 tag+i word',context[i])
add('i-1 word',context[i-1])
add('i-1 suffix',context[i-1][-3:])
add('i-2 word',context[i-2])
add('i+1 word',context[i+1])
add('i+1 suffix',context[i+1][-3:])
add('i+2 word',context[i+2])
return features
def _make_tagdict(self,sentences):
'''Make a tag dictionary for single-tag words.'''
counts = defaultdict(lambda: defaultdict(int))
for sent in sentences:
for word,tag in zip(sent[0],sent[1]):
counts[word][tag] += 1
self.classes.add(tag)
freq_thresh = 20
ambiguity_thresh = 0.97
for word,tag_freqs in counts.items():
tag,mode = max(tag_freqs.items(),key=lambda item: item[1])
n = sum(tag_freqs.values())
# Don't add rare words to the tag dictionary
# Only add quite unambiguous words
if n >= freq_thresh and (float(mode) / n) >= ambiguity_thresh:
self.tagdict[word] = tag
def _pc(n,d):
return (float(n) / d) * 100
def train(parser,nr_iter):
parser.tagger.start_training(sentences)
for itn in tqdm(range(nr_iter)):
corr = 0; total = 0
random.shuffle(sentences)
for words,gold_parse,gold_label in sentences:
corr += parser.train_one(itn,gold_parse)
if itn < 5:
parser.tagger.train_one(words,gold_tags)
total += len(words)
print('Iter: %s,accuracy: %.3f' % (itn,(float(corr) / float(total))))
if itn == 4:
parser.tagger.model.average_weights()
print('Averaging weights')
parser.model.average_weights()
def read_pos(loc):
for line in open(loc):
if not line.strip():
continue
words = DefaultList('')
tags = DefaultList('')
for token in line.split():
if not token:
continue
word,tag = token.rsplit('/',1)
#words.append(normalize(word))
words.append(word)
tags.append(tag)
pad_tokens(words); pad_tokens(tags)
yield words,tags
def read_conll(loc): # pragma: no cover
n = 0
with io.open(loc,encoding='utf8') as file_:
sent_strs = file_.read().strip().split('\n\n')
for sent_str in sent_strs:
lines = [line.split() for line in sent_str.split('\n')
if not line.startswith('#')]
words = []
tags = []
heads = []
labels = []
for i,pieces in enumerate(lines):
if len(pieces) == 4:
word,pos,label = pieces
else:
idx,lemma,pos1,morph,label,_,_2 = pieces
if '-' in idx:
continue
words.append(word)
tags.append(pos)
heads.append(head)
labels.append(label)
yield words,labels
def pad_tokens(tokens):
tokens.insert(0,'<start>')
tokens.append('ROOT')
def main(model_dir,train_loc,heldout_gold):
if not os.path.exists(model_dir):
os.mkdir(model_dir)
parser = Parser(load=False)
sentences = list(read_conll(train_loc))
train(parser,nr_iter=15)
parser.save()
c = 0
t = 0
gold_words,gold_heads,gold_labels = list(read_conll(heldout_gold))
t1 = time.time()
for (words,tags),(_,gold_labels) in zip(input_sents,gold_sents):
_,heads = parser.parse(words)
for i,w in list(enumerate(words))[1:-1]:
if gold_labels[i] in ('P','punct'):
continue
if heads[i] == gold_heads[i]:
c += 1
t += 1
t2 = time.time()
print('Parsing took %0.3f ms' % ((t2-t1)*1000.0))
print(c,t,float(c)/t)
import unittest
# run testcase: python /Users/hain/ai/text-dependency-parser/app/app.py Test.testExample
class Test(unittest.TestCase):
'''
'''
def setUp(self):
pass
def tearDown(self):
pass
def test_UD_English_EWT(self):
print("test_UD_English_EWT")
model_dir = path.join(curdir,path.pardir,"tmp","model")
train_loc = path.join(curdir,"data","UD_English-EWT","en_ewt-ud-dev.conllu")
heldout_gold = path.join(curdir,"en_ewt-ud-test.conllu")
main(model_dir,heldout_gold)
def test():
unittest.main()
if __name__ == '__main__':
test()
尝试在通用树库上运行:https://github.com/UniversalDependencies/UD_English-EWT/blob/master/en_ewt-ud-dev.conllu
但是不断出现此错误:
E
错误:test_UD_English_EWT(主要。测试)
回溯(最近通话最近): 文件“ C:\ Users \ LENOVO \ Desktop \ WorkfromHome \ CNN \ tmp \ model \ data \ UD_English-EWT \ p1.py”,第572行,在 test_UD_English_EWT 主(model_dir,train_loc,holdout_gold) 文件“ C:\ Users \ LENOVO \ Desktop \ WorkfromHome \ CNN \ tmp \ model \ data \ UD_English-EWT \ p1.py”,第524行,在 主要 os.mkdir(model_dir) FileNotFoundError:[WinError 3]系统找不到指定的路径: 'C:\ Users \ LENOVO \ Desktop \ WorkfromHome \ CNN \ tmp \ model \ data \ UD_English-EWT \ .. \ tmp \ model'
Ran 1 test in 0.063s
Failed (errors=1)
提前谢谢!。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)