| import jwt | |
| from werkzeug.exceptions import Unauthorized | |
| from configs import dify_config | |
| class PassportService: | |
| def __init__(self): | |
| self.sk = dify_config.SECRET_KEY | |
| def issue(self, payload): | |
| return jwt.encode(payload, self.sk, algorithm="HS256") | |
| def verify(self, token): | |
| try: | |
| return jwt.decode(token, self.sk, algorithms=["HS256"]) | |
| except jwt.exceptions.InvalidSignatureError: | |
| raise Unauthorized("Invalid token signature.") | |
| except jwt.exceptions.DecodeError: | |
| raise Unauthorized("Invalid token.") | |
| except jwt.exceptions.ExpiredSignatureError: | |
| raise Unauthorized("Token has expired.") | |