| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- from typing import Tuple, List, Dict, Optional
- from dataclasses import dataclass
- import math
- import torch
- import torch.nn.functional as F
- from torch import nn
- from pydantic import BaseModel
- from models.common import trunc_normal_init_
- from models.layers import rms_norm, SwiGLU, Attention, RotaryEmbedding, CosSin, CastedEmbedding, CastedLinear
- from models.sparse_embedding import CastedSparseEmbedding
- @dataclass
- class HierarchicalReasoningModel_ACTV1InnerCarry:
- z_H: torch.Tensor
- z_L: torch.Tensor
- @dataclass
- class HierarchicalReasoningModel_ACTV1Carry:
- inner_carry: HierarchicalReasoningModel_ACTV1InnerCarry
-
- steps: torch.Tensor
- halted: torch.Tensor
-
- current_data: Dict[str, torch.Tensor]
- class HierarchicalReasoningModel_ACTV1Config(BaseModel):
- batch_size: int
- seq_len: int
- puzzle_emb_ndim: int = 0
- num_puzzle_identifiers: int
- vocab_size: int
- H_cycles: int
- L_cycles: int
- H_layers: int
- L_layers: int
- # Transformer config
- hidden_size: int
- expansion: float
- num_heads: int
- pos_encodings: str
- rms_norm_eps: float = 1e-5
- rope_theta: float = 10000.0
-
- # Halting Q-learning config
- halt_max_steps: int
- halt_exploration_prob: float
- forward_dtype: str = "bfloat16"
- # Alexia: added
- mlp_t: bool=False # use mlp on L instead of transformer
- class HierarchicalReasoningModel_ACTV1Block(nn.Module):
- def __init__(self, config: HierarchicalReasoningModel_ACTV1Config) -> None:
- super().__init__()
- self.config = config
- if self.config.mlp_t:
- self.puzzle_emb_len = -(self.config.puzzle_emb_ndim // -self.config.hidden_size)
- self.mlp_t = SwiGLU(
- hidden_size=self.config.seq_len + self.puzzle_emb_len, # L
- expansion=config.expansion,
- )
- else:
- self.self_attn = Attention(
- hidden_size=config.hidden_size,
- head_dim=config.hidden_size // config.num_heads,
- num_heads=config.num_heads,
- num_key_value_heads=config.num_heads,
- causal=False
- )
- self.mlp = SwiGLU(
- hidden_size=config.hidden_size,
- expansion=config.expansion,
- )
- self.norm_eps = config.rms_norm_eps
- def forward(self, cos_sin: CosSin, hidden_states: torch.Tensor) -> torch.Tensor:
- # B, L, D = hidden_states.shape
- # Post Norm
- if self.config.mlp_t:
- hidden_states = hidden_states.transpose(1,2)
- out = self.mlp_t(hidden_states)
- hidden_states = rms_norm(hidden_states + out, variance_epsilon=self.norm_eps)
- hidden_states = hidden_states.transpose(1,2)
- else:
- # Self Attention
- hidden_states = rms_norm(hidden_states + self.self_attn(cos_sin=cos_sin, hidden_states=hidden_states), variance_epsilon=self.norm_eps)
- # Fully Connected
- out = self.mlp(hidden_states)
- hidden_states = rms_norm(hidden_states + out, variance_epsilon=self.norm_eps)
- return hidden_states
- class HierarchicalReasoningModel_ACTV1ReasoningModule(nn.Module):
- def __init__(self, layers: List[HierarchicalReasoningModel_ACTV1Block]):
- super().__init__()
- self.layers = torch.nn.ModuleList(layers)
- def forward(self, hidden_states: torch.Tensor, input_injection: torch.Tensor, **kwargs) -> torch.Tensor:
- # Input injection (add)
- hidden_states = hidden_states + input_injection
- # Layers
- for layer in self.layers:
- hidden_states = layer(hidden_states=hidden_states, **kwargs)
- return hidden_states
- class HierarchicalReasoningModel_ACTV1_Inner(nn.Module):
- def __init__(self, config: HierarchicalReasoningModel_ACTV1Config) -> None:
- super().__init__()
- self.config = config
- self.forward_dtype = getattr(torch, self.config.forward_dtype)
- # I/O
- self.embed_scale = math.sqrt(self.config.hidden_size)
- embed_init_std = 1.0 / self.embed_scale
- self.embed_tokens = CastedEmbedding(self.config.vocab_size, self.config.hidden_size, init_std=embed_init_std, cast_to=self.forward_dtype)
- self.lm_head = CastedLinear(self.config.hidden_size, self.config.vocab_size, bias=False)
- self.q_head = CastedLinear(self.config.hidden_size, 2, bias=True)
- self.puzzle_emb_len = -(self.config.puzzle_emb_ndim // -self.config.hidden_size) # ceil div
- if self.config.puzzle_emb_ndim > 0:
- # Zero init puzzle embeddings
- self.puzzle_emb = CastedSparseEmbedding(self.config.num_puzzle_identifiers, self.config.puzzle_emb_ndim,
- batch_size=self.config.batch_size, init_std=0, cast_to=self.forward_dtype)
- # LM Blocks
- if self.config.pos_encodings == "rope":
- self.rotary_emb = RotaryEmbedding(dim=self.config.hidden_size // self.config.num_heads,
- max_position_embeddings=self.config.seq_len + self.puzzle_emb_len,
- base=self.config.rope_theta)
- elif self.config.pos_encodings == "learned":
- self.embed_pos = CastedEmbedding(self.config.seq_len + self.puzzle_emb_len, self.config.hidden_size, init_std=embed_init_std, cast_to=self.forward_dtype)
- else:
- pass
- # Reasoning Layers
- self.H_level = HierarchicalReasoningModel_ACTV1ReasoningModule(layers=[HierarchicalReasoningModel_ACTV1Block(self.config) for _i in range(self.config.H_layers)])
- self.L_level = HierarchicalReasoningModel_ACTV1ReasoningModule(layers=[HierarchicalReasoningModel_ACTV1Block(self.config) for _i in range(self.config.L_layers)])
- # Initial states
- self.H_init = nn.Buffer(trunc_normal_init_(torch.empty(self.config.hidden_size, dtype=self.forward_dtype), std=1), persistent=True)
- self.L_init = nn.Buffer(trunc_normal_init_(torch.empty(self.config.hidden_size, dtype=self.forward_dtype), std=1), persistent=True)
- # Q head special init
- # Init Q to (almost) zero for faster learning during bootstrapping
- with torch.no_grad():
- self.q_head.weight.zero_()
- self.q_head.bias.fill_(-5) # type: ignore
- def _input_embeddings(self, input: torch.Tensor, puzzle_identifiers: torch.Tensor):
- # Token embedding
- embedding = self.embed_tokens(input.to(torch.int32))
- # Puzzle embeddings
- if self.config.puzzle_emb_ndim > 0:
- puzzle_embedding = self.puzzle_emb(puzzle_identifiers)
-
- pad_count = self.puzzle_emb_len * self.config.hidden_size - puzzle_embedding.shape[-1]
- if pad_count > 0:
- puzzle_embedding = F.pad(puzzle_embedding, (0, pad_count))
- embedding = torch.cat((puzzle_embedding.view(-1, self.puzzle_emb_len, self.config.hidden_size), embedding), dim=-2)
- # Position embeddings
- if self.config.pos_encodings == "learned":
- # scale by 1/sqrt(2) to maintain forward variance
- embedding = 0.707106781 * (embedding + self.embed_pos.embedding_weight.to(self.forward_dtype))
- # Scale
- return self.embed_scale * embedding
- def empty_carry(self, batch_size: int):
- return HierarchicalReasoningModel_ACTV1InnerCarry(
- z_H=torch.empty(batch_size, self.config.seq_len + self.puzzle_emb_len, self.config.hidden_size, dtype=self.forward_dtype),
- z_L=torch.empty(batch_size, self.config.seq_len + self.puzzle_emb_len, self.config.hidden_size, dtype=self.forward_dtype),
- )
-
- def reset_carry(self, reset_flag: torch.Tensor, carry: HierarchicalReasoningModel_ACTV1InnerCarry):
- return HierarchicalReasoningModel_ACTV1InnerCarry(
- z_H=torch.where(reset_flag.view(-1, 1, 1), self.H_init, carry.z_H),
- z_L=torch.where(reset_flag.view(-1, 1, 1), self.L_init, carry.z_L),
- )
- def forward(self, carry: HierarchicalReasoningModel_ACTV1InnerCarry, batch: Dict[str, torch.Tensor]) -> Tuple[HierarchicalReasoningModel_ACTV1InnerCarry, torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]:
- seq_info = dict(
- cos_sin=self.rotary_emb() if hasattr(self, "rotary_emb") else None,
- )
- # Input encoding
- input_embeddings = self._input_embeddings(batch["inputs"], batch["puzzle_identifiers"])
- # Forward iterations
- with torch.no_grad():
- z_H, z_L = carry.z_H, carry.z_L
- for _H_step in range(self.config.H_cycles):
- for _L_step in range(self.config.L_cycles):
- if not ((_H_step == self.config.H_cycles - 1) and (_L_step == self.config.L_cycles - 1)):
- z_L = self.L_level(z_L, z_H + input_embeddings, **seq_info)
- if not (_H_step == self.config.H_cycles - 1):
- z_H = self.H_level(z_H, z_L, **seq_info)
- assert not z_H.requires_grad and not z_L.requires_grad
- # 1-step grad
- z_L = self.L_level(z_L, z_H + input_embeddings, **seq_info)
- z_H = self.H_level(z_H, z_L, **seq_info)
- # LM Outputs
- new_carry = HierarchicalReasoningModel_ACTV1InnerCarry(z_H=z_H.detach(), z_L=z_L.detach()) # New carry no grad
- output = self.lm_head(z_H)[:, self.puzzle_emb_len:]
- # Q head
- q_logits = self.q_head(z_H[:, 0]).to(torch.float32)
-
- return new_carry, output, (q_logits[..., 0], q_logits[..., 1])
- class HierarchicalReasoningModel_ACTV1(nn.Module):
- """ACT wrapper."""
- def __init__(self, config_dict: dict):
- super().__init__()
- self.config = HierarchicalReasoningModel_ACTV1Config(**config_dict)
- self.inner = HierarchicalReasoningModel_ACTV1_Inner(self.config)
- @property
- def puzzle_emb(self):
- return self.inner.puzzle_emb
- def initial_carry(self, batch: Dict[str, torch.Tensor]):
- batch_size = batch["inputs"].shape[0]
- return HierarchicalReasoningModel_ACTV1Carry(
- inner_carry=self.inner.empty_carry(batch_size), # Empty is expected, it will be reseted in first pass as all sequences are halted.
-
- steps=torch.zeros((batch_size, ), dtype=torch.int32),
- halted=torch.ones((batch_size, ), dtype=torch.bool), # Default to halted
-
- current_data={k: torch.empty_like(v) for k, v in batch.items()}
- )
-
- def forward(self, carry: HierarchicalReasoningModel_ACTV1Carry, batch: Dict[str, torch.Tensor]) -> Tuple[HierarchicalReasoningModel_ACTV1Carry, Dict[str, torch.Tensor]]:
- # Update data, carry (removing halted sequences)
- new_inner_carry = self.inner.reset_carry(carry.halted, carry.inner_carry)
-
- new_steps = torch.where(carry.halted, 0, carry.steps)
- new_current_data = {k: torch.where(carry.halted.view((-1, ) + (1, ) * (batch[k].ndim - 1)), batch[k], v) for k, v in carry.current_data.items()}
- # Forward inner model
- new_inner_carry, logits, (q_halt_logits, q_continue_logits) = self.inner(new_inner_carry, new_current_data)
- outputs = {
- "logits": logits,
- "q_halt_logits": q_halt_logits,
- "q_continue_logits": q_continue_logits
- }
-
- with torch.no_grad():
- # Step
- new_steps = new_steps + 1
- is_last_step = new_steps >= self.config.halt_max_steps
-
- halted = is_last_step
- # if training, and ACT is enabled
- if self.training and (self.config.halt_max_steps > 1):
- # Halt signal
- # NOTE: During evaluation, always use max steps, this is to guarantee the same halting steps inside a batch for batching purposes
- halted = halted | (q_halt_logits > q_continue_logits)
- # Exploration
- min_halt_steps = (torch.rand_like(q_halt_logits) < self.config.halt_exploration_prob) * torch.randint_like(new_steps, low=2, high=self.config.halt_max_steps + 1)
- halted = halted & (new_steps >= min_halt_steps)
- # Compute target Q
- # NOTE: No replay buffer and target networks for computing target Q-value.
- # As batch_size is large, there're many parallel envs.
- # Similar concept as PQN https://arxiv.org/abs/2407.04811
- next_q_halt_logits, next_q_continue_logits = self.inner(new_inner_carry, new_current_data)[-1]
-
- outputs["target_q_continue"] = torch.sigmoid(torch.where(is_last_step, next_q_halt_logits, torch.maximum(next_q_halt_logits, next_q_continue_logits)))
- return HierarchicalReasoningModel_ACTV1Carry(new_inner_carry, new_steps, halted, new_current_data), outputs
|