Spaces:
Sleeping
Sleeping
| from typing import Union | |
| from datetime import timedelta | |
| from request import RequestDefault as req | |
| from response import ResponseDefault as res | |
| from response import ResponseUser as res_login | |
| from firebase_admin import credentials, auth, exceptions | |
| import firebase_admin | |
| import base64 | |
| import auth.authentication as auth123 | |
| import re | |
| from repository import UserRepository, UserInfoRepository | |
| from pathlib import Path | |
| import cloudinary | |
| import cloudinary.uploader | |
| from fastapi import FastAPI, File, UploadFile | |
| from function import support_function as sf | |
| from dotenv import load_dotenv | |
| from service import AuthService as authservice | |
| import os | |
| load_dotenv() | |
| CLOUDINARY_CLOUD_NAME=os.getenv("CLOUDINARY_CLOUD_NAME") | |
| CLOUDINARY_API_KEY=os.getenv("CLOUDINARY_API_KEY") | |
| CLOUDINARY_API_SECRET=os.getenv("CLOUDINARY_API_SECRET") | |
| cloudinary.config( | |
| cloud_name=CLOUDINARY_CLOUD_NAME, | |
| api_key=CLOUDINARY_API_KEY, | |
| api_secret=CLOUDINARY_API_SECRET | |
| ) | |
| import os | |
| try: | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate("../certificate/firebase_certificate.json") | |
| fred = firebase_admin.initialize_app(cred) | |
| except: | |
| try: | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate("firebase_certificate.json") | |
| fred = firebase_admin.initialize_app(cred) | |
| except: | |
| if not firebase_admin._apps: | |
| json_path = Path(__file__).resolve().parent / 'app' / 'firebase_certificate.json' | |
| cred = credentials.Certificate(str(json_path)) | |
| fred = firebase_admin.initialize_app(cred) | |
| regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' | |
| def check_email(email): | |
| if(re.fullmatch(regex, email)): | |
| return True | |
| else: | |
| return False | |
| def get_user(email): | |
| try: | |
| user = auth.get_user_by_email(email) | |
| return user | |
| except exceptions.FirebaseError as e: | |
| return None | |
| def create_user(email): | |
| user = auth.create_user(email=email) | |
| return user | |
| def get_user1(email): | |
| try: | |
| user = auth.get_user_by_email(email) | |
| return user | |
| except exceptions.FirebaseError as e: | |
| return None | |
| async def create_firebase_user(request: req.RequestCreateFireBaseUserGoogle): | |
| try: | |
| email = request.email | |
| token_google = request.token_google | |
| check_email_fc = sf.check_email_empty_invalid(email) | |
| if check_email_fc is not True: | |
| return check_email_fc | |
| user = get_user(email) | |
| if token_google is None or token_google == "": | |
| return res.ReponseError( | |
| status=400, | |
| data=res.Message(message="token google not empty") | |
| ) | |
| check_google = authservice.verify_token_google(token_google) | |
| if check_google == False: | |
| return res.ReponseError( | |
| status=400, | |
| data=res.Message(message="Create user failed") | |
| ) | |
| if user: | |
| email1 = user.email | |
| display_name = user.display_name | |
| uid = user.uid | |
| photo_url = user.photo_url | |
| return res.ResponseCreateFireBaseUser( | |
| status=200, | |
| data = res.DataCreateFireBaseUser(localId=uid, | |
| email=email1, | |
| displayName=display_name, | |
| photoUrl=photo_url) | |
| ) | |
| else: | |
| return res.ReponseError( | |
| status=500, | |
| data =res.Message(message="Error") | |
| ) | |
| except: | |
| return res.ReponseError( | |
| status=500, | |
| data =res.Message(message="Server Error") | |
| ) | |
| async def info_user(request: req.RequestInfoUser): | |
| try: | |
| user_id = request.user_id | |
| email = sf.check_email_service(user_id) | |
| if isinstance(email, res.ReponseError): | |
| return email | |
| user = get_user(email) | |
| if user is None: | |
| return res.ReponseError( | |
| status=404, | |
| data=res.Message(message="User not found") | |
| ) | |
| uid = user.uid if user.uid else "" | |
| email = user.email if user.email else "" | |
| display_name = user.display_name if user.display_name else "N/A" | |
| photo_url = user.photo_url if user.photo_url else "N/A" | |
| return res.ResponseInfoUser( | |
| status=200, | |
| data=res.DataInfoUser( | |
| uid=uid, | |
| email=email, | |
| display_name=display_name, | |
| photo_url=photo_url | |
| ) | |
| ) | |
| except Exception as e: | |
| return res.ReponseError( | |
| status=500, | |
| data=res.Message(message="Server Error: " + str(e)) | |
| ) | |
| def check_email_token(token): | |
| try: | |
| decoded_token = auth123.decodeJWT(token) | |
| sub_value = decoded_token.get("sub") | |
| name_user = base64.b85decode(sub_value.encode('ascii')).decode('ascii') | |
| return name_user | |
| except: | |
| return False | |
| async def is_me(request: req.RequestIsMe): | |
| try: | |
| token = request.token | |
| if token is None or token == "": | |
| return res.ReponseError( | |
| status=400, | |
| data =res.Message(message="token is empty") | |
| ) | |
| test = check_email_token(token) | |
| if test is not False: | |
| user_id = UserRepository.getUserByEmail(test).id | |
| return res.ResponseIsMe( | |
| status=200, | |
| data = res.DataIsMe(user_id = user_id)) | |
| except: | |
| return res.ReponseError( | |
| status=500, | |
| data=res.Message(message="Server Error") | |
| ) | |
| ALLOWED_EXTENSIONS = {'png', 'jpg','jpeg'} | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| async def upload_image_service(request: req.RequestUpLoadImage): | |
| try: | |
| user_id = request.user_id | |
| file = request.files | |
| email = sf.check_email_service(user_id) | |
| if isinstance(email, res.ReponseError): | |
| return email | |
| if not allowed_file(file.filename): | |
| return res.ReponseError( | |
| status=415, | |
| data=res.Message(message=f"File type not allow") | |
| ) | |
| temp_file_path = f"temp_image_{email}.png" | |
| contents = file.file.read() | |
| with open(temp_file_path, "wb") as temp_file: | |
| temp_file.write(contents) | |
| upload_result = cloudinary.uploader.upload(temp_file_path, public_id=email) | |
| os.remove(temp_file_path) | |
| return res.ResponseUploadImage( | |
| status=200, | |
| data=res.DataUploadImage(url=upload_result["secure_url"]) | |
| ) | |
| except: | |
| return res.ReponseError( | |
| status=500, | |
| data=res.Message(message="Server Error") | |
| ) |