Spaces:
Sleeping
Sleeping
import streamlit as st | |
import torch | |
import numpy as np # linear algebra | |
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv) | |
import torch | |
from transformers import AutoTokenizer, AutoModel #for embeddings | |
import os | |
from transformers import AutoTokenizer, AutoModelForMaskedLM | |
from datetime import datetime | |
tokenizer = AutoTokenizer.from_pretrained("AshenR/AshenBERTo") | |
modelBert = AutoModel.from_pretrained("AshenR/AshenBERTo",output_hidden_states=True) | |
ishape = (768) | |
def get_embeddings2(text, token_length, device='cuda'): | |
import torch | |
# Dynamically check if CUDA is available | |
device = torch.device(device if torch.cuda.is_available() else 'cpu') | |
print(f"Using device: {device}") | |
# Tokenize the input text | |
tokens = tokenizer(text, max_length=token_length, padding='max_length', truncation=True, return_tensors='pt') | |
# Move tensors to the specified device | |
input_ids = tokens.input_ids.to(device) | |
attention_mask = tokens.attention_mask.to(device) | |
# Move the model to the same device | |
modelBert.to(device) | |
# Get the model output | |
with torch.no_grad(): # Disable gradient calculation for inference | |
output = modelBert(input_ids, attention_mask=attention_mask).hidden_states[-1] | |
# Compute the mean of the output across the token dimension | |
mean_output = torch.mean(output, dim=1) | |
# Move the result back to CPU and convert to numpy | |
return mean_output.cpu().detach().numpy() | |
import tensorflow as tf | |
from tensorflow.keras import layers, Model, callbacks | |
from tensorflow.keras.layers import ( | |
Dense, Dropout, BatchNormalization, Activation, Input, Bidirectional, LSTM, GlobalAveragePooling1D | |
) | |
from tensorflow.keras.regularizers import l2 | |
import os | |
class SiameseNetwork(Model): | |
def __init__(self, inputShape, featExtractorConfig, lstm_units=128, dropout_rate=0.5, | |
add_lstm=True, distance_metric="concat", regularization=0.01): | |
super(SiameseNetwork, self).__init__() | |
self.inputShape = inputShape | |
self.featExtractorConfig = featExtractorConfig | |
self.add_lstm = add_lstm | |
self.lstm_units = lstm_units | |
self.dropout_rate = dropout_rate | |
self.distance_metric = distance_metric | |
self.regularization = regularization | |
# Define inputs | |
inp_a = layers.Input(shape=inputShape, name="Input_A") | |
inp_b = layers.Input(shape=inputShape, name="Input_B") | |
# Build shared feature extractor | |
self.feature_extractor = self.build_feature_extractor() | |
# Extract features | |
feats_a = self.feature_extractor(inp_a) | |
feats_b = self.feature_extractor(inp_b) | |
# Compute similarity | |
if distance_metric == "concat": | |
distance = layers.Concatenate()([feats_a, feats_b]) | |
elif distance_metric == "euclidean": | |
distance = layers.Lambda(lambda tensors: tf.norm(tensors[0] - tensors[1], axis=1, keepdims=True))([feats_a, feats_b]) | |
elif distance_metric == "cosine": | |
distance = layers.Lambda(lambda tensors: tf.keras.losses.cosine_similarity(tensors[0], tensors[1]))([feats_a, feats_b]) | |
else: | |
raise ValueError(f"Unsupported distance metric: {distance_metric}") | |
# Output layer | |
outputs = layers.Dense(1, activation="sigmoid", name="Output")(distance) | |
# Define the model | |
self.model = Model(inputs=[inp_a, inp_b], outputs=outputs) | |
def build_feature_extractor(self): | |
inputs = Input(shape=self.inputShape) | |
x = inputs | |
# Add configurable dense layers with regularization | |
for n_units in self.featExtractorConfig: | |
x = Dense(n_units, activation=None, kernel_regularizer=l2(self.regularization))(x) | |
x = BatchNormalization()(x) | |
x = Activation('relu')(x) | |
x = Dropout(self.dropout_rate)(x) | |
if self.add_lstm: | |
# Add sequence dimension for LSTM | |
x = layers.Reshape((-1, self.featExtractorConfig[-1]))(x) | |
# Add Bidirectional LSTM | |
x = Bidirectional(LSTM(self.lstm_units, return_sequences=True, dropout=self.dropout_rate))(x) | |
x = GlobalAveragePooling1D()(x) # Efficient pooling for dimensionality reduction | |
return Model(inputs, x, name="FeatureExtractor") | |
def call(self, inputs): | |
return self.model(inputs) | |
def save_model(self, filepath): | |
# Ensure the directory exists | |
os.makedirs(filepath, exist_ok=True) | |
# Save the entire model (architecture, weights, and compilation details) | |
self.model.save(os.path.join(filepath, 'siamese_model.keras')) | |
# Save model configuration for reconstruction | |
config = { | |
'inputShape': self.inputShape, | |
'featExtractorConfig': self.featExtractorConfig, | |
'lstm_units': self.lstm_units, | |
'dropout_rate': self.dropout_rate, | |
'add_lstm': self.add_lstm, | |
'distance_metric': self.distance_metric, | |
'regularization': self.regularization | |
} | |
# Save the configuration | |
import json | |
with open(os.path.join(filepath, 'model_config.json'), 'w') as f: | |
json.dump(config, f) | |
print(f"Model saved successfully to {filepath}") | |
def load_model(cls, filepath): | |
# Load the configuration | |
import json | |
with open(os.path.join(filepath, 'model_config.json'), 'r') as f: | |
config = json.load(f) | |
# Reconstruct the model using the saved configuration | |
siamese_net = cls(**config) | |
# Load the saved weights and compilation details | |
siamese_net.model = tf.keras.models.load_model(os.path.join(filepath, 'siamese_model.keras')) | |
print(f"Model loaded successfully from {filepath}") | |
return siamese_net | |
# loaded_siamese_net = SiameseNetwork.load_model('./saved_siamese_model') | |
def predict(input_1, input_2): | |
loaded_siamese_net = SiameseNetwork.load_model('./saved_siamese_model') | |
x1_test = [] | |
x2_test = [] | |
embedding1 = get_embeddings2(input_1, token_length=100).reshape(ishape) | |
embedding2 = get_embeddings2(input_2, token_length=100).reshape(ishape) | |
x1_test.append(embedding1) | |
x2_test.append(embedding2) | |
x1_test = np.array(x1_test) | |
x2_test = np.array(x2_test) | |
pred = str(round(loaded_siamese_net.predict([x1_test,x2_test])[0][0]*100,2))+ " %" | |
result = f"Predicted Similarity: {pred}" | |
print(input_1,input_2,pred) | |
return result | |
# prompt: mongodb atlas python code | |
import pymongo | |
def connect_to_mongodb(connection_string): | |
try: | |
client = pymongo.MongoClient(connection_string) | |
# Perform a simple operation to verify the connection | |
client.admin.command('ismaster') | |
print("Successfully connected to MongoDB Atlas!") | |
return client | |
except pymongo.errors.ConnectionFailure as e: | |
print(f"Could not connect to MongoDB Atlas: {e}") | |
return None | |
# Replace with your actual connection string from MongoDB Atlas | |
# Initialize session state for feedback tracking | |
if "feedback_submitted" not in st.session_state: | |
st.session_state["feedback_submitted"] = False | |
if "show_feedback" not in st.session_state: | |
st.session_state["show_feedback"] = False | |
def save_feedback(input_1, input_2, prediction, rating, feedback): | |
connection_string = "mongodb+srv://ashen8810:[email protected]/?retryWrites=true&w=majority&appName=Cluster0" | |
client = connect_to_mongodb(connection_string) | |
if client: | |
try: | |
collection = client["user_predictions"]["survey"] | |
document = { | |
"timestamp": datetime.now(), | |
"input_1": str(input_1), | |
"input_2": str(input_2), | |
"prediction": str(prediction), | |
"rating": rating, | |
"feedback": str(feedback) | |
} | |
collection.insert_one(document) | |
print("Data Saved to mongoDB") | |
return True | |
except Exception as e: | |
st.error(f"Database Error: {e}") | |
return False | |
finally: | |
client.close() | |
# Initialize session state | |
if "feedback_submitted" not in st.session_state: | |
st.session_state.feedback_submitted = False | |
if "show_feedback" not in st.session_state: | |
st.session_state.show_feedback = False | |
# Page header | |
st.title("Sinhala Short Sentence Similarity") | |
st.write("Compare the similarity between two Sinhala sentences.") | |
input_1 = st.text_input("First Sentence:") | |
input_2 = st.text_input("Second Sentence:") | |
# Prediction section | |
if st.button("Compare Sentences", type="primary"): | |
if input_1 and input_2: | |
with st.spinner("Calculating similarity..."): | |
result = predict(input_1, input_2) | |
if result: | |
st.success(result) | |
st.session_state.show_feedback = True | |
save_feedback(input_1, input_2, result, 0, "Null") | |
else: | |
st.warning("Please enter both sentences to compare.") | |
# Feedback section | |
if st.session_state.show_feedback and not st.session_state.feedback_submitted: | |
st.subheader("Feedback") | |
is_correct = st.radio("Is this similarity assessment correct?", ("Yes", "No")) | |
if is_correct == "No": | |
rating = st.slider("How accurate was the prediction?", 0.0, 1.0, 0.5, 0.1) | |
feedback = st.text_area("Please provide detailed feedback:") | |
if st.button("Submit Feedback"): | |
if save_feedback(input_1, input_2, predict(input_1, input_2), rating, feedback): | |
st.success("Thank you for your feedback!") | |
st.session_state.feedback_submitted = True | |
if st.button("Clear"): | |
st.session_state.clear() | |
st.session_state.input_1 = "" | |
st.session_state.input_2 = "" | |
# Footer | |
st.markdown(""" | |
<hr> | |
<p style="text-align: center; color: gray; font-size: 0.8em;"> | |
Developed by Ashen | Version 1.0 | |
</p> | |
""", unsafe_allow_html=True) | |