import json from request import RequestMySQL as req from response import ResponseMySQL as res from response import ResponseDefault as res1 from repository import ChatHistoryRepository from repository import DetailChatRepository,UserRepository from fastapi.responses import JSONResponse from function import support_function as sf import re regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' def check_email(email): if(re.fullmatch(regex, email)): return True else: return False async def edit_chat(request: req.RequestEditNameChat): try: user_id = request.user_id name_old = request.name_old name_new = request.name_new email = sf.check_email_service(user_id) if isinstance(email, res1.ReponseError): return email if name_old is None or name_old =="": return res.ReponseError( status=400, data =res.Message(message="name_old is empty") ) if name_new is None or name_new == "": return res.ReponseError( status=400, data =res.Message(message="name_new is empty") ) chat_exist = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChatNew(user_id, name_new) if chat_exist: return res.ReponseError( status=400, data=res.Message(message="name_new duplicate") ) id_chat = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id, name_old) if id_chat: ChatHistoryRepository.updateNameChatHistory(user_id,name_old,name_new) check = True else: check = False if check is True: return res.ResponseEditChat( status= 200, data= res.Message(message=check) ) else: return res.ReponseError( status=500, data =res.Message(message="Update chat error") ) except: return res.ReponseError( status=500, data =res.Message(message="Server Error") ) async def delete_chat(request: req.RequestDeleteChat): try: user_id = request.user_id chat_name = request.chat_name email = sf.check_email_service(user_id) if isinstance(email, res1.ReponseError): return email if chat_name is None or chat_name =="": return res.ReponseError( status=400, data =res.Message(message="chat_name is empty") ) DetailChatRepository.delete_chat_detail(chat_name) chat_exist = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id, chat_name) if chat_exist is None: return res.ReponseError( status=404, data=res.Message(message="chat_name not exist") ) check = ChatHistoryRepository.deleteChatHistory(user_id,chat_name) if check is True: return res.ResponseDeleteChat( status= 200, data= res.Message(message="Delete conversation chat success") ) else: return res.ReponseError( status=500, data =res.Message(message="Delete conversation chat error") ) except Exception as e: return res.ResponseDeleteChat( status=500, data=res.Message(message=str(e)) ) async def delete_chat_detail_by_id(request: req.RequestDeleteDetailChat): try: user_id = request.user_id chat_detail_id = request.id_chat_detail email = sf.check_email_service(user_id) if isinstance(email, res1.ReponseError): return email if chat_detail_id is None or chat_detail_id == " ": return res.ReponseError( status=400, data=res.Message(message="id chat_detail is empty") ) check = DetailChatRepository.delete_chat_detail_by_id((chat_detail_id)) if check is True: return res.ResponseDeleteChatDetailById( status= 200, data= res.CheckModel(check=check) ) else: return res.ResponseDeleteChatDetailById( status=200, data=res.CheckModel(check=check) ) except Exception as e: return res.ResponseDeleteChat( status=500, data=res.Message(message=str(e)) ) async def render_chat_history(request: req.RequestRenderChatHistory): try: user_id = request.user_id email = sf.check_email_service(user_id) if isinstance(email, res1.ReponseError): return email chat_detail = ChatHistoryRepository.getChatHistoryById(user_id) chat1 = [res.ListUserChat(id=item.id, email=item.email, chat_name=item.name_chat) for item in chat_detail] return res.ResponseRenderChatHistory( status=200, data=res.UserInfoListResponse(chat=chat1) ) except Exception as e: return res.ReponseError( status=500, data=res.Message(message="Server Error") ) async def create_chat_history(request: req.RequestCreateChatHistory): try: user_id = request.user_id chat_name = request.chat_name email = sf.check_email_service(str(user_id)) if isinstance(email, res1.ReponseError): return email if chat_name is None or chat_name == "": return res.ReponseError( status=400, data=res.Message(message="chat_name is empty") ) check = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id,chat_name) if check is not None: return res.ReponseError( status=400, data=res.Message(message="chat_name exist") ) ChatHistoryRepository.addChatHistory(user_id,chat_name) return res.ResponseCreateChat( status=200, data=res.Message(message="create chat success") ) except: return res.ReponseError( status=500, data=res.Message(message="Server Error") ) async def get_detail_chat_by_chat_id(request: req.RequestGetChatDetails): id = request.id if id is None or id == "": return res.ReponseError( status=400, data=res.Message(message="Id is empty") ) chat_detail1 = DetailChatRepository.getDetailChatByChatId(id) if chat_detail1: return res.ResponseChatDetailById( status=200, data=res.ChatDetailById( id = chat_detail1.id, data_relevant = chat_detail1.data_relevant, source_file = chat_detail1.source_file ) ) else: return res.ReponseError( status=404, data=res.Message(message="Chat not exist") ) async def load_chat_history(request: req.RequestLoadChatHistory): try: chat_id = request.chat_id user_id = request.user_id email = sf.check_email_service(str(user_id)) if isinstance(email, res1.ReponseError): return email if chat_id is None or chat_id == "": return res.ReponseError( status = 400, data = res.Message(message="chat_id is empty") ) check_exist_chatid_width_user_id = ChatHistoryRepository.getChatHistoryByChatIdAndUserId(chat_id,user_id) if check_exist_chatid_width_user_id is None: return res.ReponseError( status=404, data=res.Message(message="Not found chat width chat_id and user_id") ) result = DetailChatRepository.getListDetailChatByChatId(chat_id) chat1 = [res.ChatDetail(id=item.id, chat_id = item.chat_id, question=item.YouMessage,answer=item.AiMessage,data_relevant = item.data_relevant,source_file=item.source_file) for item in result] return res.ResponseLoadChatHistory( status = 200, data = res.ListChatDeTail(detail_chat=chat1) ) except: return res.ReponseError( status=500, data=res.Message(message="Server Error") ) async def delete_last_chat_detail_by_chat_name(request: req.RequestStopChat): try: user_id = request.user_id chat_name = request.chat_name email = sf.check_email_service(user_id) if isinstance(email, res1.ReponseError): return email if not chat_name: return res.ReponseError( status=400, data=res.Message(message="chat_name is empty") ) check_exist_chat = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id, chat_name) if check_exist_chat is None: return res.ReponseError( status=404, data=res.Message(message="Chat not exist") ) check = ChatHistoryRepository.delete_last_chat_detail_by_chat_name_and_email(chat_name, user_id) if check: return res.ResponseStopChat( status=200, data=res.Message(message="stop chat success") ) else: return res.ReponseError( status=500, data=res.Message(message="Failed to stop chat") ) except Exception as e: return res.ReponseError( status=500, data=res.Message(message=f"Server Error: {e}") )