LSTM Networks for Time Series Forecasting
Long Short-Term Memory (LSTM) networks are powerful for sequential data. In this guide, we'll explore how they work and apply them to financial time series.
The Problem with Vanilla RNNs
Standard RNNs suffer from the vanishing gradient problem:
As grows, this product either vanishes () or explodes ().
LSTM Architecture
LSTMs solve this with a gating mechanism and a cell state that acts as a highway for gradients.
The Cell State
The cell state flows through time with minimal modification:
Where is element-wise multiplication.
The Gates
Forget Gate - decides what to forget:
Input Gate - decides what to store:
Candidate Values:
Output Gate - decides what to output:
Hidden State:
1import numpy as np
2
3class LSTMCell:
4 """
5 Single LSTM cell implementation.
6 """
7 def __init__(self, input_size, hidden_size):
8 self.hidden_size = hidden_size
9
10 # Combined weights for efficiency
11 # Shape: (4 * hidden_size, input_size + hidden_size)
12 self.W = np.random.randn(4 * hidden_size, input_size + hidden_size) * 0.01
13 self.b = np.zeros((4 * hidden_size, 1))
14
15 def forward(self, x, h_prev, c_prev):
16 """
17 Forward pass through LSTM cell.
18
19 Parameters:
20 x -- input at current timestep (input_size, 1)
21 h_prev -- previous hidden state (hidden_size, 1)
22 c_prev -- previous cell state (hidden_size, 1)
23
24 Returns:
25 h_next -- next hidden state
26 c_next -- next cell state
27 cache -- values needed for backprop
28 """
29 # Concatenate input and previous hidden state
30 concat = np.vstack([h_prev, x])
31
32 # Compute all gates at once
33 gates = self.W @ concat + self.b
34
35 # Split into individual gates
36 h = self.hidden_size
37 f_gate = self._sigmoid(gates[:h]) # Forget gate
38 i_gate = self._sigmoid(gates[h:2*h]) # Input gate
39 c_tilde = np.tanh(gates[2*h:3*h]) # Candidate
40 o_gate = self._sigmoid(gates[3*h:]) # Output gate
41
42 # Update cell state
43 c_next = f_gate * c_prev + i_gate * c_tilde
44
45 # Compute hidden state
46 h_next = o_gate * np.tanh(c_next)
47
48 cache = (x, h_prev, c_prev, f_gate, i_gate, c_tilde, o_gate, c_next)
49
50 return h_next, c_next, cache
51
52 def _sigmoid(self, x):
53 return 1 / (1 + np.exp(-np.clip(x, -500, 500)))Time Series Forecasting with LSTM
Data Preparation
For time series, we create sequences of input-output pairs:
1import numpy as np
2import pandas as pd
3
4def create_sequences(data, seq_length, forecast_horizon=1):
5 """
6 Create sequences for LSTM training.
7
8 Parameters:
9 data -- numpy array of shape (n_samples, n_features)
10 seq_length -- number of time steps to look back
11 forecast_horizon -- number of steps to predict ahead
12
13 Returns:
14 X -- sequences of shape (n_sequences, seq_length, n_features)
15 y -- targets of shape (n_sequences, forecast_horizon)
16 """
17 X, y = [], []
18
19 for i in range(len(data) - seq_length - forecast_horizon + 1):
20 X.append(data[i:(i + seq_length)])
21 y.append(data[i + seq_length:i + seq_length + forecast_horizon, 0])
22
23 return np.array(X), np.array(y)
24
25
26def prepare_financial_data(df, feature_cols, target_col, seq_length=60):
27 """
28 Prepare financial data for LSTM.
29 """
30 # Calculate returns and technical features
31 df['returns'] = df['close'].pct_change()
32 df['log_returns'] = np.log(df['close'] / df['close'].shift(1))
33 df['volatility'] = df['returns'].rolling(20).std()
34 df['ma_ratio'] = df['close'] / df['close'].rolling(20).mean()
35
36 # Drop NaN
37 df = df.dropna()
38
39 # Normalize features
40 from sklearn.preprocessing import StandardScaler
41 scaler = StandardScaler()
42 scaled_data = scaler.fit_transform(df[feature_cols])
43
44 # Create sequences
45 X, y = create_sequences(scaled_data, seq_length)
46
47 return X, y, scalerComplete LSTM Model
1import torch
2import torch.nn as nn
3
4class LSTMForecaster(nn.Module):
5 """
6 LSTM model for time series forecasting.
7 """
8 def __init__(self, input_size, hidden_size, num_layers, output_size, dropout=0.2):
9 super().__init__()
10
11 self.hidden_size = hidden_size
12 self.num_layers = num_layers
13
14 self.lstm = nn.LSTM(
15 input_size=input_size,
16 hidden_size=hidden_size,
17 num_layers=num_layers,
18 batch_first=True,
19 dropout=dropout if num_layers > 1 else 0
20 )
21
22 self.fc = nn.Sequential(
23 nn.Linear(hidden_size, hidden_size // 2),
24 nn.ReLU(),
25 nn.Dropout(dropout),
26 nn.Linear(hidden_size // 2, output_size)
27 )
28
29 def forward(self, x):
30 # x shape: (batch, seq_len, features)
31
32 # LSTM output
33 lstm_out, (h_n, c_n) = self.lstm(x)
34
35 # Use last hidden state
36 out = self.fc(lstm_out[:, -1, :])
37
38 return out
39
40
41def train_lstm(model, train_loader, val_loader, epochs=100, lr=0.001):
42 """
43 Training loop for LSTM forecaster.
44 """
45 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
46 model = model.to(device)
47
48 criterion = nn.MSELoss()
49 optimizer = torch.optim.Adam(model.parameters(), lr=lr)
50 scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
51 optimizer, patience=10, factor=0.5
52 )
53
54 best_val_loss = float('inf')
55
56 for epoch in range(epochs):
57 # Training
58 model.train()
59 train_loss = 0
60
61 for X_batch, y_batch in train_loader:
62 X_batch = X_batch.to(device)
63 y_batch = y_batch.to(device)
64
65 optimizer.zero_grad()
66 predictions = model(X_batch)
67 loss = criterion(predictions, y_batch)
68 loss.backward()
69
70 # Gradient clipping
71 torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
72
73 optimizer.step()
74 train_loss += loss.item()
75
76 # Validation
77 model.eval()
78 val_loss = 0
79
80 with torch.no_grad():
81 for X_batch, y_batch in val_loader:
82 X_batch = X_batch.to(device)
83 y_batch = y_batch.to(device)
84 predictions = model(X_batch)
85 val_loss += criterion(predictions, y_batch).item()
86
87 train_loss /= len(train_loader)
88 val_loss /= len(val_loader)
89
90 scheduler.step(val_loss)
91
92 if val_loss < best_val_loss:
93 best_val_loss = val_loss
94 torch.save(model.state_dict(), 'best_model.pt')
95
96 if epoch % 10 == 0:
97 print(f"Epoch {epoch}: Train Loss = {train_loss:.6f}, Val Loss = {val_loss:.6f}")
98
99 return modelPractical Considerations
Sequence Length
The optimal sequence length depends on:
- Market regime duration
- Computational constraints
- Memory requirements
Rule of thumb: Start with 20-60 time steps for daily data.
Stationarity
Financial time series are often non-stationary. Transform to:
- Returns:
- Log returns:
- Normalized prices:
Avoiding Look-Ahead Bias
Always use rolling normalization:
1def rolling_normalize(data, window=252):
2 """
3 Rolling z-score normalization to avoid look-ahead bias.
4 """
5 rolling_mean = data.rolling(window=window).mean()
6 rolling_std = data.rolling(window=window).std()
7
8 normalized = (data - rolling_mean) / rolling_std
9
10 return normalized.dropna()Key Takeaways
- LSTMs solve vanishing gradients with cell state and gates
- Proper data preparation is crucial - normalize, handle stationarity
- Sequence length affects what patterns can be learned
- Gradient clipping prevents exploding gradients
- Rolling normalization avoids look-ahead bias
- Validation strategy must respect temporal ordering
LSTMs are powerful but require careful implementation for financial applications!