| import numpy | |
| from transformers import TokenClassificationPipeline | |
| class BellmanFordTokenClassificationPipeline(TokenClassificationPipeline): | |
| def __init__(self,**kwargs): | |
| super().__init__(**kwargs) | |
| x=self.model.config.label2id | |
| y=[k for k in x if k.find("|")<0 and not k.startswith("I-")] | |
| self.transition=numpy.full((len(x),len(x)),-numpy.inf) | |
| self.ilabel=numpy.full(len(x),-numpy.inf) | |
| for k,v in x.items(): | |
| if k.find("|")<0: | |
| for j in ["I-"+k[2:]] if k.startswith("B-") else [k]+y if k.startswith("I-") else y: | |
| self.transition[v,x[j]]=0 | |
| if k.startswith("I-"): | |
| self.ilabel[v]=0 | |
| def check_model_type(self,supported_models): | |
| pass | |
| def postprocess(self,model_outputs,**kwargs): | |
| if "logits" not in model_outputs: | |
| return self.postprocess(model_outputs[0],**kwargs) | |
| return self.bellman_ford_token_classification(model_outputs,**kwargs) | |
| def bellman_ford_token_classification(self,model_outputs,**kwargs): | |
| m=model_outputs["logits"][0].numpy() | |
| x=model_outputs["offset_mapping"][0].tolist() | |
| for i,(s,e) in enumerate(x): | |
| if i>0 and s<e and x[i-1][1]>s: | |
| m[i]+=self.ilabel | |
| e=numpy.exp(m-numpy.max(m,axis=-1,keepdims=True)) | |
| z=e/e.sum(axis=-1,keepdims=True) | |
| for i in range(m.shape[0]-1,0,-1): | |
| m[i-1]+=numpy.max(m[i]+self.transition,axis=1) | |
| k=[numpy.argmax(m[0]+self.transition[0])] | |
| for i in range(1,m.shape[0]): | |
| k.append(numpy.argmax(m[i]+self.transition[k[-1]])) | |
| w=[{"entity":self.model.config.id2label[j],"start":s,"end":e,"score":z[i,j]} for i,((s,e),j) in enumerate(zip(x,k)) if s<e] | |
| if "aggregation_strategy" in kwargs and kwargs["aggregation_strategy"]!="none": | |
| for i,t in reversed(list(enumerate(w))): | |
| p=t.pop("entity") | |
| if p.startswith("I-"): | |
| w[i-1]["score"]=min(w[i-1]["score"],t["score"]) | |
| w[i-1]["end"]=w.pop(i)["end"] | |
| elif i>0 and w[i-1]["end"]>t["start"]: | |
| w[i-1]["score"]=min(w[i-1]["score"],t["score"]) | |
| w[i-1]["end"]=w.pop(i)["end"] | |
| elif p.startswith("B-"): | |
| t["entity_group"]=p[2:] | |
| else: | |
| t["entity_group"]=p | |
| for t in w: | |
| t["text"]=model_outputs["sentence"][t["start"]:t["end"]] | |
| return w | |
| class UniversalDependenciesPipeline(BellmanFordTokenClassificationPipeline): | |
| def __init__(self,**kwargs): | |
| kwargs["aggregation_strategy"]="simple" | |
| super().__init__(**kwargs) | |
| x=self.model.config.label2id | |
| self.root=numpy.full((len(x)),-numpy.inf) | |
| self.left_arc=numpy.full((len(x)),-numpy.inf) | |
| self.right_arc=numpy.full((len(x)),-numpy.inf) | |
| for k,v in x.items(): | |
| if k.endswith("|root"): | |
| self.root[v]=0 | |
| elif k.find("|l-")>0: | |
| self.left_arc[v]=0 | |
| elif k.find("|r-")>0: | |
| self.right_arc[v]=0 | |
| self.multiword={} | |
| if self.model.config.task_specific_params: | |
| if "upos_multiword" in self.model.config.task_specific_params: | |
| self.multiword=self.model.config.task_specific_params["upos_multiword"] | |
| def postprocess(self,model_outputs,**kwargs): | |
| import torch | |
| kwargs["aggregation_strategy"]="simple" | |
| if "logits" not in model_outputs: | |
| return self.postprocess(model_outputs[0],**kwargs) | |
| w=self.bellman_ford_token_classification(model_outputs,**kwargs) | |
| off=[(t["start"],t["end"]) for t in w] | |
| for i,(s,e) in reversed(list(enumerate(off))): | |
| if s<e: | |
| d=w[i]["text"] | |
| j=len(d)-len(d.lstrip()) | |
| if j>0: | |
| d=d.lstrip() | |
| off[i]=(off[i][0]+j,off[i][1]) | |
| j=len(d)-len(d.rstrip()) | |
| if j>0: | |
| d=d.rstrip() | |
| off[i]=(off[i][0],off[i][1]-j) | |
| if d.strip()=="": | |
| off.pop(i) | |
| w.pop(i) | |
| else: | |
| p=w[i]["entity_group"] | |
| if p in self.multiword: | |
| d=d.lower() | |
| if d in self.multiword[p]: | |
| j=self.multiword[p][d] | |
| if "".join(j)==d: | |
| for k in reversed(j[1:]): | |
| e=off[i][1] | |
| w.insert(i+1,{"start":e-len(k),"end":e,"text":k,"entity_group":""}) | |
| off.insert(i+1,(e-len(k),e)) | |
| w[i]["end"]=e-len(k) | |
| off[i]=(off[i][0],e-len(k)) | |
| w[i]["text"]=" "+j[0] | |
| w[i]["entity_group"]="" | |
| else: | |
| s,e=off[i] | |
| for k in reversed(j[1:]): | |
| w.insert(i+1,{"start":s,"end":e,"text":" "+k,"entity_group":"+"}) | |
| off.insert(i+1,(s,e)) | |
| w[i]["text"]=" "+j[0] | |
| w[i]["entity_group"]=f"+{len(j)}" | |
| v=self.tokenizer([t["text"] for t in w],add_special_tokens=False) | |
| x=[not t["entity_group"].endswith(".") for t in w] | |
| if len(x)<127: | |
| x=[True]*len(x) | |
| else: | |
| k=sum([len(x)-i+1 if b else 0 for i,b in enumerate(x)])+1 | |
| for i in numpy.argsort(numpy.array([t["score"] for t in w])): | |
| if x[i]==False and k+len(x)-i<8192: | |
| x[i]=True | |
| k+=len(x)-i+1 | |
| ids=[-1] | |
| for i in range(len(x)): | |
| if x[i]: | |
| ids.append(i) | |
| for j in range(i+1,len(x)): | |
| ids.append(j) | |
| ids.append(-1) | |
| with torch.no_grad(): | |
| e=self.model.get_input_embeddings().weight | |
| m=[] | |
| for j in v["input_ids"]: | |
| if j==[]: | |
| j=[self.tokenizer.unk_token_id] | |
| m.append(e[j,:].sum(axis=0)) | |
| m.append(e[self.tokenizer.sep_token_id,:]) | |
| m=torch.stack(m).to(self.device) | |
| e=self.model(inputs_embeds=torch.unsqueeze(m[ids,:],0)) | |
| m=e.logits[0].cpu().numpy() | |
| e=numpy.full((len(x),len(x),m.shape[-1]),m.min()) | |
| k=1 | |
| for i in range(len(x)): | |
| if x[i]: | |
| e[i,i]=m[k]+self.root | |
| k+=1 | |
| for j in range(1,len(x)-i): | |
| e[i+j,i]=m[k]+self.left_arc | |
| e[i,i+j]=m[k]+self.right_arc | |
| k+=1 | |
| k+=1 | |
| m,p=numpy.max(e,axis=2),numpy.argmax(e,axis=2) | |
| h=self.chu_liu_edmonds(m) | |
| z=[i for i,j in enumerate(h) if i==j] | |
| if len(z)>1: | |
| k,h=z[numpy.argmax(m[z,z])],numpy.min(m)-numpy.max(m) | |
| m[:,z]+=[[0 if j in z and (i!=j or i==k) else h for i in z] for j in range(m.shape[0])] | |
| h=self.chu_liu_edmonds(m) | |
| q=[self.model.config.id2label[p[j,i]].split("|") for i,j in enumerate(h)] | |
| t=model_outputs["sentence"].replace("\n"," ") | |
| u="# text = "+t+"\n" | |
| for i,(s,e) in enumerate(off): | |
| m=w[i]["entity_group"] | |
| if m.startswith("+"): | |
| if m!="+": | |
| u+="\t".join([f"{i+1}-{i+int(m)}",t[s:e],"_","_","_","_","_","_","_","_" if i+int(m)<len(off) and e<off[i+int(m)][0] else "SpaceAfter=No"])+"\n" | |
| u+="\t".join([str(i+1),w[i]["text"].strip(),"_",q[i][0],"_","_" if len(q[i])<3 else "|".join(q[i][1:-1]),str(0 if h[i]==i else h[i]+1),"root" if q[i][-1]=="root" else q[i][-1][2:],"_","_"])+"\n" | |
| else: | |
| u+="\t".join([str(i+1),t[s:e],"_",q[i][0],"_","_" if len(q[i])<3 else "|".join(q[i][1:-1]),str(0 if h[i]==i else h[i]+1),"root" if q[i][-1]=="root" else q[i][-1][2:],"_","_" if i+1<len(off) and e<off[i+1][0] else "SpaceAfter=No"])+"\n" | |
| return u+"\n" | |
| def chu_liu_edmonds(self,matrix): | |
| h=numpy.argmax(matrix,axis=0) | |
| x=[-1 if i==j else j for i,j in enumerate(h)] | |
| for b in [lambda x,i,j:-1 if i not in x else x[i],lambda x,i,j:-1 if j<0 else x[j]]: | |
| y=[] | |
| while x!=y: | |
| y=list(x) | |
| for i,j in enumerate(x): | |
| x[i]=b(x,i,j) | |
| if max(x)<0: | |
| return h | |
| y,x=[i for i,j in enumerate(x) if j==max(x)],[i for i,j in enumerate(x) if j<max(x)] | |
| z=matrix-numpy.max(matrix,axis=0) | |
| m=numpy.block([[z[x,:][:,x],numpy.max(z[x,:][:,y],axis=1).reshape(len(x),1)],[numpy.max(z[y,:][:,x],axis=0),numpy.max(z[y,y])]]) | |
| k=[j if i==len(x) else x[j] if j<len(x) else y[numpy.argmax(z[y,x[i]])] for i,j in enumerate(self.chu_liu_edmonds(m))] | |
| h=[j if i in y else k[x.index(i)] for i,j in enumerate(h)] | |
| i=y[numpy.argmax(z[x[k[-1]],y] if k[-1]<len(x) else z[y,y])] | |
| h[i]=x[k[-1]] if k[-1]<len(x) else i | |
| return h | |