SchemaSync / schema_sync /db_connector.py
tuankg1028's picture
Upload folder using huggingface_hub
4beb1ef verified
raw
history blame
2.2 kB
from sqlalchemy import create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import logging
logger = logging.getLogger(__name__)
Base = declarative_base()
class DBConnector:
def __init__(self, conn_string=None):
"""Initialize database connection"""
self.conn_string = conn_string
self.engine = None
self.Session = None
self.metadata = None
def connect(self, conn_string=None):
"""Connect to the database with the given connection string"""
if conn_string:
self.conn_string = conn_string
if not self.conn_string:
raise ValueError("Database connection string must be provided")
try:
self.engine = create_engine(self.conn_string)
self.Session = sessionmaker(bind=self.engine)
self.metadata = MetaData()
self.metadata.bind = self.engine
logger.info(f"Connected to database: {self.conn_string.split('@')[-1]}")
return True
except Exception as e:
logger.error(f"Failed to connect to database: {str(e)}")
return False
def get_session(self):
"""Get a new session for database operations"""
if not self.Session:
raise ConnectionError("Database connection not established")
return self.Session()
def execute_query(self, query):
"""Execute raw SQL query and return results"""
if not self.engine:
raise ConnectionError("Database connection not established")
try:
with self.engine.connect() as connection:
# Use text() to properly handle SQL queries
from sqlalchemy import text
result = connection.execute(text(query))
return result.fetchall()
except Exception as e:
logger.error(f"Query execution error: {str(e)}")
raise
def close(self):
"""Close the database connection"""
if self.engine:
self.engine.dispose()
logger.info("Database connection closed")