| import math, torch | |
| import torch.nn as nn | |
| from einops import repeat | |
| class BridgeAttentionPolicy(nn.Module): | |
| def __init__(self, v_hidden, t_hidden, state_dim, policy_dim, n_heads, n_layers, n_queries, action_dim, dropout=0.1): | |
| super().__init__() | |
| self.n_queries = n_queries | |
| self.query = nn.Parameter(torch.randn(n_queries, policy_dim) / math.sqrt(policy_dim)) | |
| self.v_proj = nn.Linear(v_hidden, policy_dim) | |
| self.t_proj = nn.Linear(t_hidden, policy_dim) | |
| self.s_proj = nn.Linear(state_dim, policy_dim) | |
| self.alpha_v = nn.Parameter(torch.tensor(0.7)) | |
| self.alpha_t = nn.Parameter(torch.tensor(0.7)) | |
| self.alpha_s = nn.Parameter(torch.tensor(0.7)) | |
| enc = nn.TransformerEncoderLayer(d_model=policy_dim, nhead=n_heads, dim_feedforward=policy_dim*4, | |
| dropout=dropout, activation="gelu", batch_first=True, norm_first=True) | |
| self.blocks = nn.TransformerEncoder(enc, num_layers=n_layers) | |
| self.norm = nn.LayerNorm(policy_dim) | |
| self.head = nn.Sequential(nn.Linear(policy_dim, policy_dim), nn.GELU(), nn.Linear(policy_dim, action_dim)) | |
| def forward(self, v_feats_layers, t_feats_layers, state_vec): | |
| B = state_vec.size(0) | |
| v_cat = torch.cat(v_feats_layers, dim=1) if v_feats_layers else None | |
| t_cat = torch.cat(t_feats_layers, dim=1) | |
| s_tok = self.s_proj(state_vec).unsqueeze(1) | |
| toks = [s_tok] | |
| if v_cat is not None: | |
| toks.append(self.v_proj(v_cat) * torch.sigmoid(self.alpha_v)) | |
| toks.append(self.t_proj(t_cat) * torch.sigmoid(self.alpha_t)) | |
| ctx = torch.cat(toks, dim=1) | |
| q = repeat(self.query, 'Q D -> B Q D', B=B) | |
| tokens = torch.cat([q, ctx], dim=1) | |
| tokens = self.blocks(tokens) | |
| pooled = self.norm(tokens[:, :self.n_queries].mean(dim=1)) | |
| return self.head(pooled) | |