Trading_Predict / app.py
shaheerawan3's picture
Update app.py
e1eed9f verified
# crypto_price_prediction.py
import os
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import ReduceLROnPlateau
import numpy as np
import pandas as pd
import yfinance as yf
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import gradio as gr
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime, timedelta
import joblib
import warnings
import ta
from tqdm import tqdm
warnings.filterwarnings('ignore')
class PriceScaler:
def __init__(self):
self.scaler = MinMaxScaler()
def fit_transform(self, data):
data_2d = np.array(data).reshape(-1, 1)
return self.scaler.fit_transform(data_2d).flatten()
def inverse_transform(self, data):
data_2d = np.array(data).reshape(-1, 1)
return self.scaler.inverse_transform(data_2d).flatten()
class CryptoPredictor(nn.Module):
def __init__(self, input_dim, hidden_dim=128, num_layers=2, dropout=0.2):
super().__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.lstm = nn.LSTM(
input_dim, hidden_dim, num_layers=num_layers, batch_first=True,
dropout=dropout if num_layers > 1 else 0, bidirectional=True
)
self.bn = nn.BatchNorm1d(hidden_dim * 2)
self.fc = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
self.confidence_fc = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
nn.Sigmoid()
)
def forward(self, x):
batch_size = x.size(0)
h0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_dim).to(x.device)
c0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_dim).to(x.device)
lstm_out, _ = self.lstm(x, (h0, c0))
last_hidden = lstm_out[:, -1, :]
normalized_hidden = self.bn(last_hidden)
prediction = self.fc(normalized_hidden)
confidence = self.confidence_fc(normalized_hidden)
return prediction, confidence
class CryptoAnalyzer:
def __init__(self, model_dir="models", cache_dir="cache"):
self.scaler = MinMaxScaler()
self.price_scaler = PriceScaler()
self.model_dir = model_dir
self.cache_dir = cache_dir
os.makedirs(model_dir, exist_ok=True)
os.makedirs(cache_dir, exist_ok=True)
self.feature_columns = [
'Open', 'High', 'Low', 'Close', 'Volume', 'Returns', 'Volatility',
'MA5', 'MA20', 'RSI', 'Price_Momentum', 'Volume_Momentum', 'MACD',
'BB_upper', 'BB_lower', 'Stoch_K', 'Stoch_D', 'ADX', 'ATR'
]
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def get_data(self, symbol, days):
end_date = datetime.now()
start_date = end_date - timedelta(days=days + 30)
df = yf.download(f"{symbol}-USD", start=start_date, end=end_date, progress=False)
if df.empty:
raise ValueError(f"No data available for {symbol}")
df['Returns'] = df['Close'].pct_change()
df['Volatility'] = df['Returns'].rolling(window=20).std()
df['MA5'] = df['Close'].rolling(window=5).mean()
df['MA20'] = df['Close'].rolling(window=20).mean()
df['RSI'] = ta.momentum.rsi(df['Close'])
df['Price_Momentum'] = ta.momentum.roc(df['Close'])
df['Volume_Momentum'] = ta.momentum.roc(df['Volume'])
macd = ta.trend.macd(df['Close'])
df['MACD'] = macd.iloc[:, 0]
bollinger = ta.volatility.BollingerBands(df['Close'])
df['BB_upper'] = bollinger.bollinger_hband()
df['BB_lower'] = bollinger.bollinger_lband()
stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close'])
df['Stoch_K'] = stoch.stoch()
df['Stoch_D'] = stoch.stoch_signal()
df['ADX'] = ta.trend.adx(df['High'], df['Low'], df['Close'])
df['ATR'] = ta.volatility.average_true_range(df['High'], df['Low'], df['Close'])
df = df.dropna()
return df.iloc[-days:]
def prepare_data(self, df, lookback):
features = df[self.feature_columns].values
scaled_features = self.scaler.fit_transform(features)
close_prices = df['Close'].values
scaled_close = self.price_scaler.fit_transform(close_prices)
X, y = [], []
for i in range(len(df) - lookback):
X.append(scaled_features[i:(i + lookback)])
y.append(scaled_close[i + lookback])
X = torch.FloatTensor(np.array(X)).to(self.device)
y = torch.FloatTensor(np.array(y)).reshape(-1).to(self.device)
return X, y
def get_model_path(self, symbol):
return os.path.join(self.model_dir, f"{symbol.lower()}_model.pth")
def get_scaler_path(self, symbol):
return os.path.join(self.model_dir, f"{symbol.lower()}_scaler.pkl")
def train_model(self, X, y, symbol):
model = CryptoPredictor(X.shape[2]).to(self.device)
criterion = nn.HuberLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)
batch_size = min(32, len(X) // 4)
dataset = torch.utils.data.TensorDataset(X, y)
train_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
best_loss = float('inf')
patience = 10
patience_counter = 0
model.train()
with tqdm(range(50), desc=f"Training {symbol} model") as pbar:
for epoch in pbar:
total_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
predictions, _ = model(batch_X)
loss = criterion(predictions, batch_y)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
scheduler.step(avg_loss)
pbar.set_postfix({'loss': f'{avg_loss:.6f}'})
if avg_loss < best_loss:
best_loss = avg_loss
patience_counter = 0
torch.save(model.state_dict(), self.get_model_path(symbol))
else:
patience_counter += 1
if patience_counter >= patience:
break
return model
def get_predictions(self, symbol, days, lookback):
try:
logging.info("Fetching data...")
df = self.get_data(symbol, days)
logging.info(f"Data fetched: {len(df)} rows.")
logging.info("Preparing data...")
X, y = self.prepare_data(df, lookback)
logging.info(f"Data prepared. Features shape: {X.shape}, Targets shape: {y.shape}")
model_path = self.get_model_path(symbol)
if os.path.exists(model_path):
logging.info("Loading existing model...")
model = CryptoPredictor(X.shape[2]).to(self.device)
model.load_state_dict(torch.load(model_path))
else:
logging.info("Training new model...")
model = self.train_model(X, y, symbol)
joblib.dump(self.scaler, self.get_scaler_path(symbol))
model.eval()
with torch.no_grad():
logging.info("Generating predictions...")
predictions, confidence = model(X)
# Log raw predictions shape
logging.info(f"Raw predictions shape: {predictions.shape}")
# Ensure predictions are 2D for inverse_transform
predictions_reshaped = predictions.cpu().numpy().reshape(-1, 1)
logging.info(f"Reshaped predictions for inverse transform: {predictions_reshaped.shape}")
predictions = self.price_scaler.inverse_transform(predictions_reshaped).flatten()
# Ensure actual prices are 2D for inverse_transform
y_np_reshaped = y.cpu().numpy().reshape(-1, 1)
logging.info(f"Reshaped actual prices for inverse transform: {y_np_reshaped.shape}")
actual_prices = self.price_scaler.inverse_transform(y_np_reshaped).flatten()
# Calculate metrics
rmse = float(np.sqrt(np.mean((actual_prices - predictions) ** 2)))
mape = float(np.mean(np.abs((actual_prices - predictions) / actual_prices)) * 100)
r2 = float(1 - np.sum((actual_prices - predictions) ** 2) / np.sum((actual_prices - actual_prices.mean()) ** 2))
logging.info("Metrics calculated.")
# Prepare date labels
dates = df.index[lookback:].strftime('%Y-%m-%d').tolist()
return {
'dates': dates,
'actual': actual_prices.tolist(),
'predicted': predictions.tolist(),
'confidence': confidence.cpu().numpy().flatten().tolist(),
'rmse': rmse,
'mape': mape,
'r2': r2,
'volatility': float(df['Volatility'].mean() * 100),
'current_price': float(df['Close'].iloc[-1]),
'volume': float(df['Volume'].iloc[-1]),
'rsi': float(df['RSI'].iloc[-1]),
'macd': float(df['MACD'].iloc[-1])
}
except Exception as e:
logging.error(f"Error during predictions: {str(e)}")
raise ValueError(f"Prediction failed: {str(e)}")
def create_analysis_plots(symbol, days=180, lookback=30):
try:
analyzer = CryptoAnalyzer()
predictions = analyzer.get_predictions(symbol, days, lookback)
fig = make_subplots(
rows=3, cols=1,
subplot_titles=(
f"{symbol} Price Prediction with Confidence Bands",
"Technical Indicators",
"Model Performance Metrics"
),
vertical_spacing=0.1,
specs=[[{"secondary_y": True}],
[{"secondary_y": True}],
[{"secondary_y": True}]]
)
confidence_upper = np.array(predictions['predicted']) * (1 + np.array(predictions['confidence']))
confidence_lower = np.array(predictions['predicted']) * (1 - np.array(predictions['confidence']))
fig.add_trace(
go.Scatter(
x=predictions['dates'],
y=predictions['actual'],
name='Actual Price',
line=dict(color='blue', width=2)
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=predictions['dates'],
y=predictions['predicted'],
name='Predicted Price',
line=dict(color='red', width=2)
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=predictions['dates'] + predictions['dates'][::-1],
y=list(confidence_upper) + list(confidence_lower)[::-1],
fill='toself',
fillcolor='rgba(255,0,0,0.1)',
line=dict(color='rgba(255,0,0,0)'),
name='Confidence Band'
),
row=1, col=1
)
fig.update_layout(
height=1200,
title_text=f"πŸ“ˆ {symbol} Price Analysis Dashboard",
showlegend=True,
template="plotly_dark",
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
font=dict(size=12)
)
summary = f"""
### πŸ“Š Analysis Summary for {symbol}
#### Current Market Status
- **Current Price:** ${predictions['current_price']:,.2f}
- **Predicted Next Price:** ${predictions['predicted'][-1]:,.2f}
- **Expected Change:** {((predictions['predicted'][-1] - predictions['current_price']) / predictions['current_price'] * 100):,.2f}%
- **24h Volume:** {predictions['volume']:,.0f}
#### Technical Indicators
- **RSI:** {predictions['rsi']:,.2f}
- **MACD:** {predictions['macd']:,.2f}
- **Volatility:** {predictions['volatility']:,.2f}%
#### Model Performance Metrics
- **RΒ² Score:** {predictions['r2']:,.4f}
- **RMSE:** ${predictions['rmse']:,.2f}
- **MAPE:** {predictions['mape']:,.2f}%
#### Prediction Confidence
- **Average Confidence:** {np.mean(predictions['confidence']) * 100:,.2f}%
- **Trend Direction:** {'πŸ”Ί Upward' if predictions['predicted'][-1] > predictions['actual'][-1] else 'πŸ”» Downward'}
> *Note: Past performance does not guarantee future results. This analysis is for informational purposes only.*
"""
return fig, summary
except Exception as e:
fig = go.Figure()
fig.add_annotation(
text=str(e),
xref="paper",
yref="paper",
x=0.5,
y=0.5,
showarrow=False
)
return fig, f"⚠️ Error: {str(e)}"
def create_interface():
with gr.Blocks(theme=gr.themes.Soft()) as iface:
gr.Markdown("""
# πŸš€ Advanced Cryptocurrency Price Prediction
This app uses deep learning to predict cryptocurrency prices and provide comprehensive market analysis.
### Features:
- Real-time price predictions
- Technical indicators analysis
- Confidence metrics
- Performance visualization
""")
with gr.Row():
with gr.Column(scale=1):
crypto_input = gr.Dropdown(
choices=['BTC', 'ETH', 'BNB', 'XRP', 'ADA', 'SOL', 'DOT', 'DOGE'],
label="Select Cryptocurrency",
value="BTC"
)
custom_crypto = gr.Textbox(
label="Or enter custom symbol",
placeholder="e.g., MATIC"
)
with gr.Row():
days_slider = gr.Slider(
minimum=30, maximum=365, value=180, step=30,
label="Historical Days"
)
lookback_slider = gr.Slider(
minimum=7, maximum=60, value=30, step=1,
label="Lookback Period (Days)"
)
submit_btn = gr.Button("πŸ“Š Generate Analysis", variant="primary")
with gr.Column(scale=2):
plot_output = gr.Plot(label="Analysis Plots")
with gr.Row():
analysis_output = gr.Markdown(label="Analysis Summary")
error_output = gr.Markdown(visible=False)
def handle_analysis(symbol, custom_symbol, days, lookback):
try:
final_symbol = custom_symbol if custom_symbol else symbol
figure, summary = create_analysis_plots(final_symbol, days, lookback)
return figure, summary, gr.update(visible=False, value="")
except Exception as e:
empty_fig = go.Figure()
error_msg = f"⚠️ Error during analysis: {str(e)}"
return empty_fig, "", gr.update(visible=True, value=error_msg)
submit_btn.click(
fn=handle_analysis,
inputs=[crypto_input, custom_crypto, days_slider, lookback_slider],
outputs=[plot_output, analysis_output, error_output]
)
return iface
if __name__ == "__main__":
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('crypto_predictor.log'),
logging.StreamHandler()
]
)
try:
os.makedirs("models", exist_ok=True)
os.makedirs("cache", exist_ok=True)
iface = create_interface()
iface.launch(
share=False, server_name="0.0.0.0", server_port=7860, debug=True
)
except Exception as e:
logging.error(f"Application failed to start: {str(e)}")
raise