Spaces:
Sleeping
Sleeping
import gradio as gr | |
from transformers import pipeline | |
import re | |
import pickle | |
import torch | |
import torch.nn as nn | |
from torchtext.transforms import PadTransform | |
from torch.nn import functional as F | |
from tqdm import tqdm | |
from underthesea import text_normalize | |
# Build Vocabulary | |
device = "cpu" | |
# Build Vocabulary | |
MAX_LENGTH = 20 | |
class Vocabulary: | |
"""The Vocabulary class is used to record words, which are used to convert | |
text to numbers and vice versa. | |
""" | |
def __init__(self, lang="vi"): | |
self.lang = lang | |
self.word2id = dict() | |
self.word2id["<sos>"] = 0 # Start of Sentence Token | |
self.word2id["<eos>"] = 1 # End of Sentence Token | |
self.word2id["<unk>"] = 2 # Unknown Token | |
self.word2id["<pad>"] = 3 # Pad Token | |
self.sos_id = self.word2id["<sos>"] | |
self.eos_id = self.word2id["<eos>"] | |
self.unk_id = self.word2id["<unk>"] | |
self.pad_id = self.word2id["<pad>"] | |
self.id2word = {v: k for k, v in self.word2id.items()} | |
self.pad_transform = PadTransform(max_length = MAX_LENGTH, pad_value = self.pad_id) | |
def __getitem__(self, word): | |
"""Return ID of word if existed else return ID unknown token | |
@param word (str) | |
""" | |
return self.word2id.get(word, self.unk_id) | |
def __contains__(self, word): | |
"""Return True if word in Vocabulary else return False | |
@param word (str) | |
""" | |
return word in self.word2id | |
def __len__(self): | |
""" | |
Return number of tokens(include sos, eos, unk and pad tokens) in Vocabulary | |
""" | |
return len(self.word2id) | |
def lookup_tokens(self, word_indexes: list): | |
"""Return the list of words by lookup by ID | |
@param word_indexes (list(int)) | |
@return words (list(str)) | |
""" | |
return [self.id2word[word_index] for word_index in word_indexes] | |
def add(self, word): | |
"""Add word to vocabulary | |
@param word (str) | |
@return index (str): index of the word just added | |
""" | |
if word not in self: | |
word_index = self.word2id[word] = len(self.word2id) | |
self.id2word[word_index] = word | |
return word_index | |
else: | |
return self[word] | |
def preprocessing_sent(self, sent, lang="en"): | |
"""Preprocessing a sentence (depend on language english or vietnamese) | |
@param sent (str) | |
@param lang (str) | |
""" | |
# Lowercase sentence and remove space at beginning and ending | |
sent = sent.lower().strip() | |
# Replace HTML charecterist | |
sent = re.sub("'", "'", sent) | |
sent = re.sub(""", '"', sent) | |
sent = re.sub("[", "[", sent) | |
sent = re.sub("]", "]", sent) | |
# Remove unnecessary space | |
sent = re.sub("(?<=\w)\.", " .", sent) | |
# Normalizing the distance between tokens (word and punctuation) | |
sent = re.sub("(?<=\w),", " ,", sent) | |
sent = re.sub("(?<=\w)\?", " ?", sent) | |
sent = re.sub("(?<=\w)\!", " !", sent) | |
sent = re.sub(" +", " ", sent) | |
if (lang == "en") or (lang == "eng") or (lang == "english"): | |
# Replace short form | |
sent = re.sub("what's", "what is", sent) | |
sent = re.sub("who's", "who is", sent) | |
sent = re.sub("which's", "which is", sent) | |
sent = re.sub("who's", "who is", sent) | |
sent = re.sub("here's", "here is", sent) | |
sent = re.sub("there's", "there is", sent) | |
sent = re.sub("it's", "it is", sent) | |
sent = re.sub("i'm", "i am", sent) | |
sent = re.sub("'re ", " are ", sent) | |
sent = re.sub("'ve ", " have ", sent) | |
sent = re.sub("'ll ", " will ", sent) | |
sent = re.sub("'d ", " would ", sent) | |
sent = re.sub("aren't", "are not", sent) | |
sent = re.sub("isn't", "is not", sent) | |
sent = re.sub("don't", "do not", sent) | |
sent = re.sub("doesn't", "does not", sent) | |
sent = re.sub("wasn't", "was not", sent) | |
sent = re.sub("weren't", "were not", sent) | |
sent = re.sub("won't", "will not", sent) | |
sent = re.sub("can't", "can not", sent) | |
sent = re.sub("let's", "let us", sent) | |
else: | |
# Package underthesea.text_normalize support to normalize vietnamese | |
sent = text_normalize(sent) | |
if not sent.endswith(('.', '!', '?')): | |
sent = sent + ' .' | |
return sent.strip() | |
def tokenize_corpus(self, corpus, disable=False): | |
"""Split the documents of the corpus into words | |
@param corpus (list(str)): list of documents | |
@param disable (bool): notified or not | |
@return tokenized_corpus (list(list(str))): list of words | |
""" | |
if not disable: | |
print("Tokenize the corpus...") | |
tokenized_corpus = list() | |
for document in tqdm(corpus, disable=disable): | |
tokenized_document = ["<sos>"] + self.preprocessing_sent(document, self.lang).split(" ") + ["<eos>"] | |
tokenized_corpus.append(tokenized_document) | |
return tokenized_corpus | |
def corpus_to_tensor(self, corpus, is_tokenized=False, disable=False): | |
"""Convert corpus to a list of indices tensor | |
@param corpus (list(str) if is_tokenized==False else list(list(str))) | |
@param is_tokenized (bool) | |
@return indicies_corpus (list(tensor)) | |
""" | |
if is_tokenized: | |
tokenized_corpus = corpus | |
else: | |
tokenized_corpus = self.tokenize_corpus(corpus, disable=disable) | |
indicies_corpus = list() | |
for document in tqdm(tokenized_corpus, disable=disable): | |
indicies_document = torch.tensor( | |
list(map(lambda word: self[word], document)), dtype=torch.int64 | |
) | |
indicies_corpus.append(self.pad_transform(indicies_document)) | |
return indicies_corpus | |
def tensor_to_corpus(self, tensor, disable=False): | |
"""Convert list of indices tensor to a list of tokenized documents | |
@param indicies_corpus (list(tensor)) | |
@return corpus (list(list(str))) | |
""" | |
corpus = list() | |
for indicies in tqdm(tensor, disable=disable): | |
document = list(map(lambda index: self.id2word[index.item()], indicies)) | |
corpus.append(document) | |
return corpus | |
with open("vocab_source_final.pkl", "rb") as file: | |
VOCAB_SOURCE = pickle.load(file) | |
with open("vocab_target_final.pkl", "rb") as file: | |
VOCAB_TARGET = pickle.load(file) | |
input_embedding = torch.zeros((len(VOCAB_SOURCE), 100)) | |
output_embedding = torch.zeros((len(VOCAB_TARGET), 100)) | |
def create_input_emb_layer(pretrained = False): | |
if not pretrained: | |
weights_matrix = torch.zeros((len(VOCAB_SOURCE), 100)) | |
else: | |
weights_matrix = input_embedding | |
num_embeddings, embedding_dim = weights_matrix.size() | |
emb_layer = nn.Embedding(num_embeddings, embedding_dim) | |
emb_layer.weight.data = weights_matrix | |
emb_layer.weight.requires_grad = False | |
return emb_layer, embedding_dim | |
def create_output_emb_layer(pretrained = False): | |
if not pretrained: | |
weights_matrix = torch.zeros((len(VOCAB_TARGET), 100)) | |
else: | |
weights_matrix = output_embedding | |
num_embeddings, embedding_dim = weights_matrix.size() | |
emb_layer = nn.Embedding(num_embeddings, embedding_dim) | |
emb_layer.weight.data = weights_matrix | |
emb_layer.weight.requires_grad = False | |
return emb_layer, embedding_dim | |
class EncoderAtt(nn.Module): | |
def __init__(self, input_dim, hidden_dim, dropout = 0.1): | |
""" Encoder RNN | |
@param input_dim (int): size of vocab_souce | |
@param hidden_dim (int) | |
@param dropout (float): dropout ratio of layer drop out | |
""" | |
super(EncoderAtt, self).__init__() | |
self.hidden_dim = hidden_dim | |
# Using pretrained Embedding | |
self.embedding, self.embedding_dim = create_input_emb_layer(True) | |
self.gru = nn.GRU(self.embedding_dim, hidden_dim, batch_first=True) | |
self.dropout = nn.Dropout(dropout) | |
def forward(self, src): | |
embedded = self.dropout(self.embedding(src)) | |
output, hidden = self.gru(embedded) | |
return output, hidden | |
class BahdanauAttention(nn.Module): | |
def __init__(self, hidden_size): | |
""" Bahdanau Attention | |
@param hidden_size (int) | |
""" | |
super(BahdanauAttention, self).__init__() | |
self.Wa = nn.Linear(hidden_size, hidden_size) | |
self.Ua = nn.Linear(hidden_size, hidden_size) | |
self.Va = nn.Linear(hidden_size, 1) | |
def forward(self, query, keys): | |
scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys))) | |
scores = scores.squeeze(2).unsqueeze(1) | |
weights = F.softmax(scores, dim=-1) | |
context = torch.bmm(weights, keys) | |
return context, weights | |
class DecoderAtt(nn.Module): | |
def __init__(self, hidden_size, output_size, dropout=0.1): | |
""" Decoder RNN using Attention | |
@param hidden_size (int) | |
@param output_size (int): size of vocab_target | |
@param dropout (float): dropout ratio of layer drop out | |
""" | |
super(DecoderAtt, self).__init__() | |
# Using pretrained Embedding | |
self.embedding, self.embedding_dim = create_output_emb_layer(True) | |
self.fc = nn.Linear(self.embedding_dim, hidden_size) | |
self.attention = BahdanauAttention(hidden_size) | |
self.gru = nn.GRU(2 * hidden_size, hidden_size, batch_first=True) | |
self.out = nn.Linear(hidden_size, output_size) | |
self.dropout = nn.Dropout(dropout) | |
def forward(self, encoder_outputs, encoder_hidden, target_tensor=None): | |
batch_size = encoder_outputs.size(0) | |
decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(0) | |
decoder_hidden = encoder_hidden | |
decoder_outputs = [] | |
attentions = [] | |
for i in range(MAX_LENGTH): | |
decoder_output, decoder_hidden, attn_weights = self.forward_step( | |
decoder_input, decoder_hidden, encoder_outputs | |
) | |
decoder_outputs.append(decoder_output) | |
attentions.append(attn_weights) | |
# Teacher forcing | |
if target_tensor is not None: | |
# Teacher forcing: Feed the target as the next input | |
decoder_input = target_tensor[:, i].unsqueeze(1) # Teacher forcing | |
else: | |
# Without teacher forcing: use its own predictions as the next input | |
_, topi = decoder_output.topk(1) | |
decoder_input = topi.squeeze(-1).detach() # detach from history as input | |
decoder_outputs = torch.cat(decoder_outputs, dim=1) | |
decoder_outputs = F.log_softmax(decoder_outputs, dim=-1) | |
attentions = torch.cat(attentions, dim=1) | |
return decoder_outputs, decoder_hidden, attentions | |
def forward_step(self, input, hidden, encoder_outputs): | |
embedded = self.dropout(self.fc(self.embedding(input))) | |
query = hidden.permute(1, 0, 2) | |
context, attn_weights = self.attention(query, encoder_outputs) | |
input_gru = torch.cat((embedded, context), dim=2) | |
output, hidden = self.gru(input_gru, hidden) | |
output = self.out(output) | |
return output, hidden, attn_weights | |
# Load VietAI Translation | |
envit5_translater = pipeline("translation", model="VietAI/envit5-translation") | |
INPUT_DIM = len(VOCAB_SOURCE) | |
OUTPUT_DIM = len(VOCAB_TARGET) | |
HID_DIM = 512 | |
# Load our Model Translation | |
ENCODER = EncoderAtt(INPUT_DIM, HID_DIM) | |
ENCODER.load_state_dict(torch.load("encoderatt_epoch_35.pt", map_location=torch.device('cpu'))) | |
DECODER = DecoderAtt(HID_DIM, OUTPUT_DIM) | |
DECODER.load_state_dict(torch.load("decoderatt_epoch_35.pt", map_location=torch.device('cpu'))) | |
def evaluate_final_model(sentence, encoder, decoder, vocab_source, vocab_target, disable = False): | |
""" Evaluation Model | |
@param encoder (EncoderAtt) | |
@param decoder (DecoderAtt) | |
@param sentence (str) | |
@param vocab_source (Vocabulary) | |
@param vocab_target (Vocabulary) | |
@param disable (bool) | |
""" | |
encoder.eval() | |
decoder.eval() | |
with torch.no_grad(): | |
input_tensor = vocab_source.corpus_to_tensor([sentence], disable = disable)[0].view(1,-1).to(device) | |
encoder_outputs, encoder_hidden = encoder(input_tensor) | |
decoder_outputs, decoder_hidden, decoder_attn = decoder(encoder_outputs, encoder_hidden) | |
_, topi = decoder_outputs.topk(1) | |
decoded_ids = topi.squeeze() | |
decoded_words = [] | |
for idx in decoded_ids: | |
if idx.item() == vocab_target.eos_id: | |
decoded_words.append('<eos>') | |
break | |
decoded_words.append(vocab_target.id2word[idx.item()]) | |
return decoded_words, decoder_attn | |
def translate_sentence(sentence): | |
output_words, _ = evaluate_final_model(sentence, ENCODER, DECODER, VOCAB_SOURCE, VOCAB_TARGET, disable= True) | |
if "<pad>" in output_words: | |
output_words.remove("<pad>") | |
if "<unk>" in output_words: | |
output_words.remove("<unk>") | |
if "<sos>" in output_words: | |
output_words.remove("<sos>") | |
if "<eos>" in output_words: | |
output_words.remove("<eos>") | |
return ' '.join(output_words).capitalize() | |
def envit5_translation(text): | |
res = envit5_translater( | |
text, | |
max_length=512, | |
early_stopping=True, | |
)[0]["translation_text"][3:] | |
return res | |
def translation(text): | |
output1 = translate_sentence(text) | |
if not text.endswith(('.', '!', '?')): | |
text = text + '.' | |
output2 = envit5_translation(text) | |
return (output1, output2) | |
if __name__ == "__main__": | |
examples = [["Hello guys", "Input"], | |
["Xin chào các bạn", "Output"]] | |
demo = gr.Interface( | |
theme = gr.themes.Base(), | |
fn=translation, | |
title="Co Gai Mo Duong", | |
description=""" | |
## Machine Translation: English to Vietnamese | |
""", | |
examples=examples, | |
inputs=[ | |
gr.Textbox( | |
lines=5, placeholder="Enter text", label="Input" | |
) | |
], | |
outputs=[ | |
gr.Textbox( | |
"text", label="Our Machine Translation" | |
), | |
gr.Textbox( | |
"text", label="VietAI Machine Translation" | |
) | |
] | |
) | |
demo.launch(share = True) | |