|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import numpy as np |
|
import random |
|
from torch.utils.data import DataLoader, Dataset |
|
from collections import deque |
|
from torchvision import datasets, transforms |
|
|
|
|
|
class PlasticLinear(nn.Module): |
|
def __init__(self, in_features, out_features, plasticity_type="hebbian", learning_rate=0.01): |
|
super().__init__() |
|
self.in_features = in_features |
|
self.out_features = out_features |
|
self.weight = nn.Parameter(torch.randn(out_features, in_features) * 0.1) |
|
self.bias = nn.Parameter(torch.zeros(out_features)) |
|
self.plasticity_type = plasticity_type |
|
self.eta = learning_rate |
|
self.trace = torch.zeros(out_features, in_features) |
|
self.register_buffer('prev_y', torch.zeros(out_features)) |
|
|
|
def forward(self, x): |
|
y = F.linear(x, self.weight, self.bias) |
|
if self.training: |
|
x_detached = x.detach() |
|
y_detached = y.detach() |
|
if self.plasticity_type == "hebbian": |
|
hebb = torch.einsum('bi,bj->ij', y_detached, x_detached) / x.size(0) |
|
self.trace = (1 - self.eta) * self.trace + self.eta * hebb |
|
with torch.no_grad(): |
|
self.weight += self.trace |
|
elif self.plasticity_type == "stdp": |
|
dy = y_detached - self.prev_y |
|
stdp = torch.einsum('bi,bj->ij', dy, x_detached) / x.size(0) |
|
self.trace = (1 - self.eta) * self.trace + self.eta * stdp |
|
with torch.no_grad(): |
|
self.weight += self.trace |
|
self.prev_y = y_detached.clone() |
|
return y |
|
|
|
|
|
class SpikeFunction(torch.autograd.Function): |
|
@staticmethod |
|
def forward(ctx, input): |
|
ctx.save_for_backward(input) |
|
return (input > 0).float() |
|
|
|
@staticmethod |
|
def backward(ctx, grad_output): |
|
input, = ctx.saved_tensors |
|
return grad_output * (abs(input) < 1).float() |
|
|
|
spike_fn = SpikeFunction.apply |
|
|
|
class LIFNeuron(nn.Module): |
|
def __init__(self, tau=2.0): |
|
super().__init__() |
|
self.tau = tau |
|
self.mem = 0 |
|
|
|
def forward(self, x): |
|
decay = torch.exp(torch.tensor(-1.0 / self.tau)) |
|
self.mem = self.mem * decay + x |
|
out = spike_fn(self.mem - 1.0) |
|
self.mem = self.mem * (1.0 - out.detach()) |
|
return out |
|
|
|
|
|
class AdaptiveLIF(nn.Module): |
|
def __init__(self, size, tau=2.0, beta=0.2): |
|
super().__init__() |
|
self.size = size |
|
self.tau = tau |
|
self.beta = beta |
|
self.mem = torch.zeros(size) |
|
self.thresh = torch.ones(size) |
|
|
|
def forward(self, x): |
|
decay = torch.exp(torch.tensor(-1.0 / self.tau)) |
|
self.mem = self.mem * decay + x |
|
out = spike_fn(self.mem - self.thresh) |
|
self.thresh = self.thresh + self.beta * out |
|
self.mem = self.mem * (1.0 - out.detach()) |
|
return out |
|
|
|
|
|
class RelayLayer(nn.Module): |
|
def __init__(self, dim, heads=4): |
|
super().__init__() |
|
self.attn = nn.MultiheadAttention(embed_dim=dim, num_heads=heads, batch_first=True) |
|
self.lif = LIFNeuron() |
|
|
|
def forward(self, x): |
|
attn_out, _ = self.attn(x, x, x) |
|
return self.lif(attn_out) |
|
|
|
|
|
class WorkingMemory(nn.Module): |
|
def __init__(self, input_dim, hidden_dim): |
|
super().__init__() |
|
self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True) |
|
|
|
def forward(self, x): |
|
out, _ = self.lstm(x) |
|
return out[:, -1] |
|
|
|
|
|
class PlaceGrid(nn.Module): |
|
def __init__(self, grid_size=10, embedding_dim=64): |
|
super().__init__() |
|
self.embedding = nn.Embedding(grid_size**2, embedding_dim) |
|
|
|
def forward(self, index): |
|
return self.embedding(index) |
|
|
|
|
|
class MirrorComparator(nn.Module): |
|
def __init__(self, dim): |
|
super().__init__() |
|
self.cos = nn.CosineSimilarity(dim=1) |
|
|
|
def forward(self, x1, x2): |
|
return self.cos(x1, x2).unsqueeze(1) |
|
|
|
|
|
class NeuroendocrineModulator(nn.Module): |
|
def __init__(self, input_dim, hidden_dim): |
|
super().__init__() |
|
self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True) |
|
|
|
def forward(self, x): |
|
out, _ = self.lstm(x) |
|
return out[:, -1] |
|
|
|
|
|
class AutonomicFeedback(nn.Module): |
|
def __init__(self, input_dim): |
|
super().__init__() |
|
self.feedback = nn.Linear(input_dim, input_dim) |
|
|
|
def forward(self, x): |
|
return torch.tanh(self.feedback(x)) |
|
|
|
|
|
class ReplayBuffer: |
|
def __init__(self, capacity=1000): |
|
self.buffer = deque(maxlen=capacity) |
|
|
|
def add(self, inputs, labels, task): |
|
self.buffer.append((inputs, labels, task)) |
|
|
|
def sample(self, batch_size): |
|
indices = random.sample(range(len(self.buffer)), batch_size) |
|
batch = [self.buffer[i] for i in indices] |
|
inputs, labels, tasks = zip(*batch) |
|
return inputs, labels, tasks |
|
|
|
|
|
class ModularBrainAgent(nn.Module): |
|
def __init__(self, input_dims, hidden_dim, output_dims): |
|
super().__init__() |
|
self.vision_encoder = nn.Linear(input_dims['vision'], hidden_dim) |
|
self.language_encoder = nn.Linear(input_dims['language'], hidden_dim) |
|
self.numeric_encoder = nn.Linear(input_dims['numeric'], hidden_dim) |
|
|
|
|
|
self.connect_sensory_to_relay = PlasticLinear(hidden_dim * 3, hidden_dim, plasticity_type='hebbian') |
|
self.relay_layer = RelayLayer(hidden_dim) |
|
self.connect_relay_to_inter = PlasticLinear(hidden_dim, hidden_dim, plasticity_type='stdp') |
|
|
|
self.interneuron = AdaptiveLIF(hidden_dim) |
|
self.memory = WorkingMemory(hidden_dim, hidden_dim) |
|
self.place = PlaceGrid(grid_size=10, embedding_dim=hidden_dim) |
|
self.comparator = MirrorComparator(hidden_dim) |
|
self.emotion = NeuroendocrineModulator(hidden_dim, hidden_dim) |
|
self.feedback = AutonomicFeedback(hidden_dim) |
|
|
|
self.task_heads = nn.ModuleDict({ |
|
task: nn.Linear(hidden_dim, out_dim) |
|
for task, out_dim in output_dims.items() |
|
}) |
|
|
|
self.replay = ReplayBuffer() |
|
|
|
def forward(self, inputs, task, position_idx=None): |
|
v = self.vision_encoder(inputs['vision']) |
|
l = self.language_encoder(inputs['language']) |
|
n = self.numeric_encoder(inputs['numeric']) |
|
|
|
sensory_cat = torch.cat([v, l, n], dim=-1) |
|
z = self.connect_sensory_to_relay(sensory_cat) |
|
|
|
z = self.relay_layer(z.unsqueeze(1)).squeeze(1) |
|
z = self.connect_relay_to_inter(z) |
|
z = self.interneuron(z) |
|
|
|
m = self.memory(z.unsqueeze(1)) |
|
p = self.place(position_idx if position_idx is not None else torch.tensor([0])) |
|
e = self.emotion(z.unsqueeze(1)) |
|
f = self.feedback(z) |
|
|
|
combined = z + m + p + e + f |
|
out = self.task_heads[task](combined) |
|
return out |
|
|
|
def remember(self, inputs, labels, task): |
|
self.replay.add(inputs, labels, task) |
|
|
|
|
|
if __name__ == "__main__": |
|
input_dims = {'vision': 32, 'language': 16, 'numeric': 8} |
|
output_dims = {'classification': 5, 'regression': 1, 'binary': 1} |
|
agent = ModularBrainAgent(input_dims, hidden_dim=64, output_dims=output_dims) |
|
|
|
tasks = list(output_dims.keys()) |
|
|
|
for step in range(250): |
|
task = random.choice(tasks) |
|
inputs = { |
|
'vision': torch.randn(1, 32), |
|
'language': torch.randn(1, 16), |
|
'numeric': torch.randn(1, 8) |
|
} |
|
labels = torch.randint(0, output_dims[task], (1,)) if task == 'classification' else torch.randn(1, output_dims[task]) |
|
output = agent(inputs, task) |
|
loss = F.cross_entropy(output, labels) if task == 'classification' else F.mse_loss(output, labels) |
|
print(f"Step {step:02d} | Task: {task:13s} | Loss: {loss.item():.4f}") |