Inhalt

Aktueller Ordner: ARS_ExplainableAI
β¬… Übergeordnet

ARS5_PyTorch.py

"""
ARS 5.0 - PyTorch Implementation
Empirical Grammar of Market Conversations
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from collections import defaultdict
from typing import List, Tuple, Dict, Optional

# ============================================================================
# 1. SYMBOLIC COMPONENT: Grammar Definition
# ============================================================================

class ARSGrammar:
    """Symbolic grammar component - the structural knowledge."""
    
    # Terminal symbols
    SYMBOLS = ['KBG', 'VBG', 'KBBd', 'VBBd', 'KBA', 'VBA', 
               'KAE', 'VAE', 'KAA', 'VAA', 'KAV', 'VAV']
    
    SYMBOL_TO_IDX = {s: i for i, s in enumerate(SYMBOLS)}
    IDX_TO_SYMBOL = {i: s for i, s in enumerate(SYMBOLS)}
    
    def __init__(self):
        # Initial transition probabilities (will be learned)
        # Start with uniform or empirical estimates
        self.transition_probs = torch.ones(len(self.SYMBOLS), len(self.SYMBOLS)) / len(self.SYMBOLS)
        
        # Constitutive rules (hard constraints)
        self.constitutive_rules = self._init_constitutive_rules()
        
    def _init_constitutive_rules(self) -> Dict[Tuple[int, int], bool]:
        """Initialize hard constraints that cannot be violated."""
        rules = {}
        
        # A greeting must be reciprocated (unless skipped)
        rules[(self.SYMBOL_TO_IDX['KBG'], self.SYMBOL_TO_IDX['VBG'])] = True
        rules[(self.SYMBOL_TO_IDX['VBG'], self.SYMBOL_TO_IDX['KBBd'])] = True
        
        # Customer inquiry must be answered
        rules[(self.SYMBOL_TO_IDX['KAE'], self.SYMBOL_TO_IDX['VAE'])] = True
        
        # Farewells are reciprocal
        rules[(self.SYMBOL_TO_IDX['KAV'], self.SYMBOL_TO_IDX['VAV'])] = True
        rules[(self.SYMBOL_TO_IDX['VAV'], self.SYMBOL_TO_IDX['KAV'])] = True
        
        return rules
    
    def is_valid_transition(self, from_idx: int, to_idx: int) -> bool:
        """Check if a transition violates a constitutive rule."""
        if (from_idx, to_idx) in self.constitutive_rules:
            return self.constitutive_rules[(from_idx, to_idx)]
        return True
    
    def update_probabilities(self, counts: torch.Tensor):
        """Update symbolic probabilities based on observed counts."""
        # Renormalize each row
        row_sums = counts.sum(dim=1, keepdim=True)
        self.transition_probs = counts / (row_sums + 1e-10)
        # Add small epsilon for unseen transitions
        self.transition_probs = (self.transition_probs + 1e-6) / (1e-6 * len(self.SYMBOLS) + 1)
    
    def get_prob(self, from_idx: int, to_idx: int) -> float:
        """Get symbolic probability of a transition."""
        return self.transition_probs[from_idx, to_idx].item()


# ============================================================================
# 2. NEURAL COMPONENT: Transition Network
# ============================================================================

class ARSNeuralTransitionNetwork(nn.Module):
    """
    Neural network for learning transition probabilities.
    System 1: Fast, pattern-based, sub-symbolic.
    """
    
    def __init__(self, n_symbols: int = 12, hidden_dim: int = 64):
        super().__init__()
        self.n_symbols = n_symbols
        
        # Architecture
        self.fc1 = nn.Linear(n_symbols, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim // 2)
        self.fc3 = nn.Linear(hidden_dim // 2, n_symbols)
        
        self.dropout = nn.Dropout(0.2)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass.
        
        Args:
            x: One-hot encoded current symbol (batch_size, n_symbols)
            
        Returns:
            Probability distribution over next symbols (batch_size, n_symbols)
        """
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return F.softmax(x, dim=1)
    
    def predict_next(self, symbol_idx: int) -> np.ndarray:
        """Predict probability distribution for next symbol."""
        x = torch.zeros(1, self.n_symbols)
        x[0, symbol_idx] = 1.0
        with torch.no_grad():
            probs = self.forward(x)
        return probs.numpy()[0]


# ============================================================================
# 3. HYBRID NEURO-SYMBOLIC SYSTEM
# ============================================================================

class ARSNeuroSymbolicSystem:
    """
    ARS 5.0: Dual-dynamics architecture.
    - Neural component: learns probabilities from data (statistical plasticity)
    - Symbolic component: maintains structural rules (structural stability)
    """
    
    def __init__(self, learning_rate: float = 0.001):
        self.neural_network = ARSNeuralTransitionNetwork()
        self.symbolic_grammar = ARSGrammar()
        self.optimizer = optim.Adam(self.neural_network.parameters(), lr=learning_rate)
        
        # Training tracking
        self.counts = torch.zeros(len(ARSGrammar.SYMBOLS), len(ARSGrammar.SYMBOLS))
        self.loss_history = []
        
    def train_on_transition(self, from_sym: str, to_sym: str):
        """
        Train on a single observed transition.
        
        This implements the dual update loop:
        1. Update symbolic counts
        2. Update neural network via backpropagation
        3. Enforce constitutive rules
        """
        from_idx = ARSGrammar.SYMBOL_TO_IDX[from_sym]
        to_idx = ARSGrammar.SYMBOL_TO_IDX[to_sym]
        
        # ===== Symbolic update (fast, counting-based) =====
        if self.symbolic_grammar.is_valid_transition(from_idx, to_idx):
            self.counts[from_idx, to_idx] += 1
            self.symbolic_grammar.update_probabilities(self.counts)
        
        # ===== Neural update (slow, gradient-based) =====
        # Prepare input
        x = torch.zeros(1, len(ARSGrammar.SYMBOLS))
        x[0, from_idx] = 1.0
        
        # Target distribution (from symbolic component as teacher)
        target = self.symbolic_grammar.transition_probs[from_idx].clone()
        
        # Forward pass
        self.optimizer.zero_grad()
        output = self.neural_network(x)
        
        # Loss: KL divergence between neural prediction and symbolic probabilities
        # This aligns the neural network with the symbolic component
        loss = F.kl_div(output.log(), target.unsqueeze(0), reduction='batchmean')
        loss.backward()
        self.optimizer.step()
        
        self.loss_history.append(loss.item())
        
        return loss.item()
    
    def train_on_corpus(self, corpus: List[List[str]], epochs: int = 10):
        """Train on the entire corpus (8 transcripts)."""
        print(f"Training on {len(corpus)} transcripts for {epochs} epochs...")
        
        for epoch in range(epochs):
            epoch_loss = 0.0
            n_transitions = 0
            
            for chain in corpus:
                for i in range(len(chain) - 1):
                    loss = self.train_on_transition(chain[i], chain[i + 1])
                    epoch_loss += loss
                    n_transitions += 1
            
            avg_loss = epoch_loss / n_transitions
            print(f"Epoch {epoch + 1}/{epochs}, Avg Loss: {avg_loss:.6f}")
    
    def predict_next(self, from_sym: str) -> Dict[str, float]:
        """Predict next symbol distribution."""
        from_idx = ARSGrammar.SYMBOL_TO_IDX[from_sym]
        
        # Neural prediction
        neural_probs = self.neural_network.predict_next(from_idx)
        
        # Symbolic probability
        symbolic_probs = self.symbolic_grammar.transition_probs[from_idx].numpy()
        
        # Combined prediction (weighted average)
        combined = 0.5 * neural_probs + 0.5 * symbolic_probs
        
        return {ARSGrammar.IDX_TO_SYMBOL[i]: combined[i] for i in range(len(ARSGrammar.SYMBOLS))}
    
    def generate_sequence(self, max_len: int = 20, start_sym: str = 'KBG') -> List[str]:
        """Generate a well-formed sequence."""
        sequence = [start_sym]
        
        for _ in range(max_len - 1):
            probs = self.predict_next(sequence[-1])
            # Filter invalid transitions (constitutive rules)
            valid_symbols = []
            valid_probs = []
            for sym, prob in probs.items():
                to_idx = ARSGrammar.SYMBOL_TO_IDX[sym]
                from_idx = ARSGrammar.SYMBOL_TO_IDX[sequence[-1]]
                if self.symbolic_grammar.is_valid_transition(from_idx, to_idx):
                    valid_symbols.append(sym)
                    valid_probs.append(prob)
            
            if not valid_symbols:
                break
            
            # Normalize and sample
            valid_probs = np.array(valid_probs) / np.sum(valid_probs)
            next_sym = np.random.choice(valid_symbols, p=valid_probs)
            sequence.append(next_sym)
            
            if next_sym in ['KAV', 'VAV']:  # End of conversation
                break
        
        return sequence
    
    def explain_transition(self, from_sym: str, to_sym: str) -> Dict:
        """Explain why a transition is valid/invalid."""
        from_idx = ARSGrammar.SYMBOL_TO_IDX[from_sym]
        to_idx = ARSGrammar.SYMBOL_TO_IDX[to_sym]
        
        return {
            'transition': f"{from_sym} β†’ {to_sym}",
            'valid_by_constitutive_rule': self.symbolic_grammar.is_valid_transition(from_idx, to_idx),
            'neural_probability': self.neural_network.predict_next(from_idx)[to_idx],
            'symbolic_probability': self.symbolic_grammar.get_prob(from_idx, to_idx),
            'count': self.counts[from_idx, to_idx].item(),
            'explanation': self._generate_explanation(from_sym, to_sym)
        }
    
    def _generate_explanation(self, from_sym: str, to_sym: str) -> str:
        """Generate human-readable explanation."""
        explanations = {
            ('KBG', 'VBG'): "Customer greeting is normally followed by seller greeting.",
            ('KBG', 'VBBd'): "Customer greeting can be followed directly by seller inquiry (skip).",
            ('VBA', 'KBBd'): "Seller reaction leads to additional customer need (upselling).",
            ('KAE', 'VAE'): "Customer inquiry must be answered by seller information (constitutive).",
            ('KAV', 'VAV'): "Farewells are always reciprocated (constitutive).",
            ('KAA', 'VBG'): "Conversation can restart after completion (new customer)."
        }
        return explanations.get((from_sym, to_sym), "Standard transition in sales conversation.")


# ============================================================================
# 4. EMPIRICAL CORPUS DATA
# ============================================================================

# The eight transcripts as terminal symbol chains
EMPIRICAL_CHAINS = [
    # T1: Butcher shop
    ['KBG', 'VBG', 'KBBd', 'VBBd', 'KBA', 'VBA', 'KBBd', 'VBBd', 'KBA', 'VAA', 'KAA', 'VAV', 'KAV'],
    # T2: Cherry stall
    ['VBG', 'KBBd', 'VBBd', 'VAA', 'KAA', 'VBG', 'KBBd', 'VAA', 'KAA'],
    # T3: Fish stall
    ['KBBd', 'VBBd', 'VAA', 'KAA'],
    # T4: Vegetable stall
    ['KBBd', 'VBBd', 'KBA', 'VBA', 'KBBd', 'VBA', 'KAE', 'VAE', 'KAA', 'VAV', 'KAV'],
    # T5: Vegetable stall (new customer)
    ['KAV', 'KBBd', 'VBBd', 'KBBd', 'VAA', 'KAV'],
    # T6: Cheese stall
    ['KBG', 'VBG', 'KBBd', 'VBBd', 'KAA'],
    # T7: Candy stall
    ['KBBd', 'VBBd', 'KBA', 'VAA', 'KAA'],
    # T8: Bakery
    ['KBG', 'VBBd', 'KBBd', 'VBA', 'VAA', 'KAA', 'VAV', 'KAV']
]


# ============================================================================
# 5. DEMONSTRATION
# ============================================================================

def main():
    print("=" * 70)
    print("ARS 5.0 - PyTorch Implementation")
    print("The Empirical Grammar of Market Conversations")
    print("=" * 70)
    
    # Create the neuro-symbolic system
    system = ARSNeuroSymbolicSystem()
    
    # Train on the corpus
    print("\n--- Training ---")
    system.train_on_corpus(EMPIRICAL_CHAINS, epochs=20)
    
    # Show learned transition probabilities
    print("\n--- Learned Transition Probabilities (sample) ---")
    for from_sym in ['KBG', 'KBBd', 'VBA', 'KAA']:
        probs = system.predict_next(from_sym)
        top_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
        print(f"\n{from_sym} β†’ {', '.join([f'{s}: {p:.3f}' for s, p in top_probs])}")
    
    # Generate example sequences
    print("\n--- Generated Sequences ---")
    for i in range(5):
        seq = system.generate_sequence(max_len=15)
        print(f"Seq {i+1}: {' β†’ '.join(seq)}")
    
    # Explanation examples
    print("\n--- Explanations ---")
    test_transitions = [('KBG', 'VBG'), ('KBG', 'VBBd'), ('VBA', 'KBBd'), ('KAE', 'VAE')]
    for from_sym, to_sym in test_transitions:
        explanation = system.explain_transition(from_sym, to_sym)
        print(f"\n{explanation['transition']}")
        print(f"  {explanation['explanation']}")
        print(f"  Probability: {explanation['neural_probability']:.3f}")
    
    return system


if __name__ == "__main__":
    system = main()