Inhalt
Aktueller Ordner:
ARS_ExplainableAIARS5_PyTorch.py
"""
ARS 5.0 - PyTorch Implementation
Empirical Grammar of Market Conversations
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from collections import defaultdict
from typing import List, Tuple, Dict, Optional
# ============================================================================
# 1. SYMBOLIC COMPONENT: Grammar Definition
# ============================================================================
class ARSGrammar:
"""Symbolic grammar component - the structural knowledge."""
# Terminal symbols
SYMBOLS = ['KBG', 'VBG', 'KBBd', 'VBBd', 'KBA', 'VBA',
'KAE', 'VAE', 'KAA', 'VAA', 'KAV', 'VAV']
SYMBOL_TO_IDX = {s: i for i, s in enumerate(SYMBOLS)}
IDX_TO_SYMBOL = {i: s for i, s in enumerate(SYMBOLS)}
def __init__(self):
# Initial transition probabilities (will be learned)
# Start with uniform or empirical estimates
self.transition_probs = torch.ones(len(self.SYMBOLS), len(self.SYMBOLS)) / len(self.SYMBOLS)
# Constitutive rules (hard constraints)
self.constitutive_rules = self._init_constitutive_rules()
def _init_constitutive_rules(self) -> Dict[Tuple[int, int], bool]:
"""Initialize hard constraints that cannot be violated."""
rules = {}
# A greeting must be reciprocated (unless skipped)
rules[(self.SYMBOL_TO_IDX['KBG'], self.SYMBOL_TO_IDX['VBG'])] = True
rules[(self.SYMBOL_TO_IDX['VBG'], self.SYMBOL_TO_IDX['KBBd'])] = True
# Customer inquiry must be answered
rules[(self.SYMBOL_TO_IDX['KAE'], self.SYMBOL_TO_IDX['VAE'])] = True
# Farewells are reciprocal
rules[(self.SYMBOL_TO_IDX['KAV'], self.SYMBOL_TO_IDX['VAV'])] = True
rules[(self.SYMBOL_TO_IDX['VAV'], self.SYMBOL_TO_IDX['KAV'])] = True
return rules
def is_valid_transition(self, from_idx: int, to_idx: int) -> bool:
"""Check if a transition violates a constitutive rule."""
if (from_idx, to_idx) in self.constitutive_rules:
return self.constitutive_rules[(from_idx, to_idx)]
return True
def update_probabilities(self, counts: torch.Tensor):
"""Update symbolic probabilities based on observed counts."""
# Renormalize each row
row_sums = counts.sum(dim=1, keepdim=True)
self.transition_probs = counts / (row_sums + 1e-10)
# Add small epsilon for unseen transitions
self.transition_probs = (self.transition_probs + 1e-6) / (1e-6 * len(self.SYMBOLS) + 1)
def get_prob(self, from_idx: int, to_idx: int) -> float:
"""Get symbolic probability of a transition."""
return self.transition_probs[from_idx, to_idx].item()
# ============================================================================
# 2. NEURAL COMPONENT: Transition Network
# ============================================================================
class ARSNeuralTransitionNetwork(nn.Module):
"""
Neural network for learning transition probabilities.
System 1: Fast, pattern-based, sub-symbolic.
"""
def __init__(self, n_symbols: int = 12, hidden_dim: int = 64):
super().__init__()
self.n_symbols = n_symbols
# Architecture
self.fc1 = nn.Linear(n_symbols, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim // 2)
self.fc3 = nn.Linear(hidden_dim // 2, n_symbols)
self.dropout = nn.Dropout(0.2)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass.
Args:
x: One-hot encoded current symbol (batch_size, n_symbols)
Returns:
Probability distribution over next symbols (batch_size, n_symbols)
"""
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = F.relu(self.fc2(x))
x = self.dropout(x)
x = self.fc3(x)
return F.softmax(x, dim=1)
def predict_next(self, symbol_idx: int) -> np.ndarray:
"""Predict probability distribution for next symbol."""
x = torch.zeros(1, self.n_symbols)
x[0, symbol_idx] = 1.0
with torch.no_grad():
probs = self.forward(x)
return probs.numpy()[0]
# ============================================================================
# 3. HYBRID NEURO-SYMBOLIC SYSTEM
# ============================================================================
class ARSNeuroSymbolicSystem:
"""
ARS 5.0: Dual-dynamics architecture.
- Neural component: learns probabilities from data (statistical plasticity)
- Symbolic component: maintains structural rules (structural stability)
"""
def __init__(self, learning_rate: float = 0.001):
self.neural_network = ARSNeuralTransitionNetwork()
self.symbolic_grammar = ARSGrammar()
self.optimizer = optim.Adam(self.neural_network.parameters(), lr=learning_rate)
# Training tracking
self.counts = torch.zeros(len(ARSGrammar.SYMBOLS), len(ARSGrammar.SYMBOLS))
self.loss_history = []
def train_on_transition(self, from_sym: str, to_sym: str):
"""
Train on a single observed transition.
This implements the dual update loop:
1. Update symbolic counts
2. Update neural network via backpropagation
3. Enforce constitutive rules
"""
from_idx = ARSGrammar.SYMBOL_TO_IDX[from_sym]
to_idx = ARSGrammar.SYMBOL_TO_IDX[to_sym]
# ===== Symbolic update (fast, counting-based) =====
if self.symbolic_grammar.is_valid_transition(from_idx, to_idx):
self.counts[from_idx, to_idx] += 1
self.symbolic_grammar.update_probabilities(self.counts)
# ===== Neural update (slow, gradient-based) =====
# Prepare input
x = torch.zeros(1, len(ARSGrammar.SYMBOLS))
x[0, from_idx] = 1.0
# Target distribution (from symbolic component as teacher)
target = self.symbolic_grammar.transition_probs[from_idx].clone()
# Forward pass
self.optimizer.zero_grad()
output = self.neural_network(x)
# Loss: KL divergence between neural prediction and symbolic probabilities
# This aligns the neural network with the symbolic component
loss = F.kl_div(output.log(), target.unsqueeze(0), reduction='batchmean')
loss.backward()
self.optimizer.step()
self.loss_history.append(loss.item())
return loss.item()
def train_on_corpus(self, corpus: List[List[str]], epochs: int = 10):
"""Train on the entire corpus (8 transcripts)."""
print(f"Training on {len(corpus)} transcripts for {epochs} epochs...")
for epoch in range(epochs):
epoch_loss = 0.0
n_transitions = 0
for chain in corpus:
for i in range(len(chain) - 1):
loss = self.train_on_transition(chain[i], chain[i + 1])
epoch_loss += loss
n_transitions += 1
avg_loss = epoch_loss / n_transitions
print(f"Epoch {epoch + 1}/{epochs}, Avg Loss: {avg_loss:.6f}")
def predict_next(self, from_sym: str) -> Dict[str, float]:
"""Predict next symbol distribution."""
from_idx = ARSGrammar.SYMBOL_TO_IDX[from_sym]
# Neural prediction
neural_probs = self.neural_network.predict_next(from_idx)
# Symbolic probability
symbolic_probs = self.symbolic_grammar.transition_probs[from_idx].numpy()
# Combined prediction (weighted average)
combined = 0.5 * neural_probs + 0.5 * symbolic_probs
return {ARSGrammar.IDX_TO_SYMBOL[i]: combined[i] for i in range(len(ARSGrammar.SYMBOLS))}
def generate_sequence(self, max_len: int = 20, start_sym: str = 'KBG') -> List[str]:
"""Generate a well-formed sequence."""
sequence = [start_sym]
for _ in range(max_len - 1):
probs = self.predict_next(sequence[-1])
# Filter invalid transitions (constitutive rules)
valid_symbols = []
valid_probs = []
for sym, prob in probs.items():
to_idx = ARSGrammar.SYMBOL_TO_IDX[sym]
from_idx = ARSGrammar.SYMBOL_TO_IDX[sequence[-1]]
if self.symbolic_grammar.is_valid_transition(from_idx, to_idx):
valid_symbols.append(sym)
valid_probs.append(prob)
if not valid_symbols:
break
# Normalize and sample
valid_probs = np.array(valid_probs) / np.sum(valid_probs)
next_sym = np.random.choice(valid_symbols, p=valid_probs)
sequence.append(next_sym)
if next_sym in ['KAV', 'VAV']: # End of conversation
break
return sequence
def explain_transition(self, from_sym: str, to_sym: str) -> Dict:
"""Explain why a transition is valid/invalid."""
from_idx = ARSGrammar.SYMBOL_TO_IDX[from_sym]
to_idx = ARSGrammar.SYMBOL_TO_IDX[to_sym]
return {
'transition': f"{from_sym} β {to_sym}",
'valid_by_constitutive_rule': self.symbolic_grammar.is_valid_transition(from_idx, to_idx),
'neural_probability': self.neural_network.predict_next(from_idx)[to_idx],
'symbolic_probability': self.symbolic_grammar.get_prob(from_idx, to_idx),
'count': self.counts[from_idx, to_idx].item(),
'explanation': self._generate_explanation(from_sym, to_sym)
}
def _generate_explanation(self, from_sym: str, to_sym: str) -> str:
"""Generate human-readable explanation."""
explanations = {
('KBG', 'VBG'): "Customer greeting is normally followed by seller greeting.",
('KBG', 'VBBd'): "Customer greeting can be followed directly by seller inquiry (skip).",
('VBA', 'KBBd'): "Seller reaction leads to additional customer need (upselling).",
('KAE', 'VAE'): "Customer inquiry must be answered by seller information (constitutive).",
('KAV', 'VAV'): "Farewells are always reciprocated (constitutive).",
('KAA', 'VBG'): "Conversation can restart after completion (new customer)."
}
return explanations.get((from_sym, to_sym), "Standard transition in sales conversation.")
# ============================================================================
# 4. EMPIRICAL CORPUS DATA
# ============================================================================
# The eight transcripts as terminal symbol chains
EMPIRICAL_CHAINS = [
# T1: Butcher shop
['KBG', 'VBG', 'KBBd', 'VBBd', 'KBA', 'VBA', 'KBBd', 'VBBd', 'KBA', 'VAA', 'KAA', 'VAV', 'KAV'],
# T2: Cherry stall
['VBG', 'KBBd', 'VBBd', 'VAA', 'KAA', 'VBG', 'KBBd', 'VAA', 'KAA'],
# T3: Fish stall
['KBBd', 'VBBd', 'VAA', 'KAA'],
# T4: Vegetable stall
['KBBd', 'VBBd', 'KBA', 'VBA', 'KBBd', 'VBA', 'KAE', 'VAE', 'KAA', 'VAV', 'KAV'],
# T5: Vegetable stall (new customer)
['KAV', 'KBBd', 'VBBd', 'KBBd', 'VAA', 'KAV'],
# T6: Cheese stall
['KBG', 'VBG', 'KBBd', 'VBBd', 'KAA'],
# T7: Candy stall
['KBBd', 'VBBd', 'KBA', 'VAA', 'KAA'],
# T8: Bakery
['KBG', 'VBBd', 'KBBd', 'VBA', 'VAA', 'KAA', 'VAV', 'KAV']
]
# ============================================================================
# 5. DEMONSTRATION
# ============================================================================
def main():
print("=" * 70)
print("ARS 5.0 - PyTorch Implementation")
print("The Empirical Grammar of Market Conversations")
print("=" * 70)
# Create the neuro-symbolic system
system = ARSNeuroSymbolicSystem()
# Train on the corpus
print("\n--- Training ---")
system.train_on_corpus(EMPIRICAL_CHAINS, epochs=20)
# Show learned transition probabilities
print("\n--- Learned Transition Probabilities (sample) ---")
for from_sym in ['KBG', 'KBBd', 'VBA', 'KAA']:
probs = system.predict_next(from_sym)
top_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
print(f"\n{from_sym} β {', '.join([f'{s}: {p:.3f}' for s, p in top_probs])}")
# Generate example sequences
print("\n--- Generated Sequences ---")
for i in range(5):
seq = system.generate_sequence(max_len=15)
print(f"Seq {i+1}: {' β '.join(seq)}")
# Explanation examples
print("\n--- Explanations ---")
test_transitions = [('KBG', 'VBG'), ('KBG', 'VBBd'), ('VBA', 'KBBd'), ('KAE', 'VAE')]
for from_sym, to_sym in test_transitions:
explanation = system.explain_transition(from_sym, to_sym)
print(f"\n{explanation['transition']}")
print(f" {explanation['explanation']}")
print(f" Probability: {explanation['neural_probability']:.3f}")
return system
if __name__ == "__main__":
system = main()