Inhalt

Aktueller Ordner: algorithmisch-rekursive-sequenzanalyse-3.0
⬅ Übergeordnet

ARS_XAI_PCFG.py

"""
Algorithmisch Rekursive Sequenzanalyse 3.0
GRAMMATIKINDUKTION DURCH HIERARCHISCHE KOMPRESSION
Bildung von Nonterminalzeichen durch Erkennung von Wiederholungen
"""

import numpy as np
from scipy.stats import pearsonr
import matplotlib.pyplot as plt
from tabulate import tabulate
from collections import Counter, defaultdict
import itertools

# ============================================================================
# 1. EMPIRISCHE DATEN: Terminalzeichenketten aus acht Transkripten
# ============================================================================

empirical_chains = [
    # Transkript 1: Metzgerei
    ['KBG', 'VBG', 'KBBd', 'VBBd', 'KBA', 'VBA', 'KBBd', 'VBBd', 'KBA', 'VAA', 'KAA', 'VAV', 'KAV'],
    # Transkript 2: Marktplatz (Kirschen)
    ['VBG', 'KBBd', 'VBBd', 'VAA', 'KAA', 'VBG', 'KBBd', 'VAA', 'KAA'],
    # Transkript 3: Fischstand
    ['KBBd', 'VBBd', 'VAA', 'KAA'],
    # Transkript 4: Gemüsestand (ausfuehrlich)
    ['KBBd', 'VBBd', 'KBA', 'VBA', 'KBBd', 'VBA', 'KAE', 'VAE', 'KAA', 'VAV', 'KAV'],
    # Transkript 5: Gemüsestand (mit KAV zu Beginn)
    ['KAV', 'KBBd', 'VBBd', 'KBBd', 'VAA', 'KAV'],
    # Transkript 6: Käseverkaufsstand
    ['KBG', 'VBG', 'KBBd', 'VBBd', 'KAA'],
    # Transkript 7: Bonbonstand
    ['KBBd', 'VBBd', 'KBA', 'VAA', 'KAA'],
    # Transkript 8: Baeckerei
    ['KBG', 'VBBd', 'KBBd', 'VBA', 'VAA', 'KAA', 'VAV', 'KAV']
]

# ============================================================================
# 2. HIERARCHISCHE GRAMMATIKINDUKTION DURCH KOMPRESSION
# ============================================================================

class GrammarInducer:
    """
    Induziert eine PCFG durch hierarchische Kompression von Wiederholungen.
    Analog zur Bildung neuer Variablen bei Termumformungen.
    """
    
    def __init__(self):
        self.rules = {}  # Nonterminal -> Liste von Produktionen mit Wahrscheinlichkeiten
        self.terminals = set()
        self.nonterminals = set()
        self.start_symbol = None
        self.compression_history = []  # Speichert die Hierarchie der Kompression
        
    def find_best_repetition(self, chains, min_length=2, max_length=5):
        """
        Findet die häufigste wiederholte Sequenz in allen Ketten.
        Berücksichtigt Überlappungen und zählt Vorkommen.
        """
        sequence_counter = Counter()
        
        for chain in chains:
            for length in range(min_length, min(max_length, len(chain) + 1)):
                for i in range(len(chain) - length + 1):
                    seq = tuple(chain[i:i+length])
                    sequence_counter[seq] += 1
        
        # Filtere Sequenzen, die mehrmals vorkommen
        repeated = {seq: count for seq, count in sequence_counter.items() 
                   if count >= 2}
        
        if not repeated:
            return None
        
        # Bewerte Sequenzen: (Häufigkeit * Länge) / (Anzahl einzigartiger Symbole)
        # Bevorzugt längere, häufige Sequenzen mit weniger einzigartigen Symbolen
        best_seq = max(repeated.items(), 
                      key=lambda x: x[1] * len(x[0]) / max(1, len(set(x[0]))))
        
        return best_seq[0]
    
    def compress_sequences(self, chains, sequence, new_nonterminal):
        """
        Ersetzt alle Vorkommen der Sequenz durch das neue Nonterminal.
        """
        compressed_chains = []
        seq_tuple = tuple(sequence)
        seq_len = len(sequence)
        
        for chain in chains:
            new_chain = []
            i = 0
            while i < len(chain):
                if i <= len(chain) - seq_len and tuple(chain[i:i+seq_len]) == seq_tuple:
                    new_chain.append(new_nonterminal)
                    i += seq_len
                else:
                    new_chain.append(chain[i])
                    i += 1
            compressed_chains.append(new_chain)
        
        return compressed_chains
    
    def generate_nonterminal_name(self, base_symbols):
        """
        Generiert einen aussagekräftigen Namen für ein neues Nonterminal.
        """
        # Extrahiere semantische Informationen aus den Symbolen
        prefixes = [s[:2] if isinstance(s, str) else str(s)[:2] for s in base_symbols]
        suffixes = [s[2:] if isinstance(s, str) and len(s) > 2 else '' for s in base_symbols]
        
        # Wenn alle Symbole mit K oder V beginnen, behalte das Muster
        if all(p in ['KB', 'VB', 'KA', 'VA', 'NT'] or p.startswith('NT') for p in prefixes):
            # Behalte das erste und letzte Symbol für die Benennung
            first = base_symbols[0]
            last = base_symbols[-1]
            # Entferne NT_ Präfix für Lesbarkeit, falls vorhanden
            first = first.replace('NT_', '') if isinstance(first, str) else str(first)
            last = last.replace('NT_', '') if isinstance(last, str) else str(last)
            return f"NT_{first}_{last}"
        
        # Generischer Name basierend auf den Symbolen
        symbol_str = "_".join(str(s).replace('NT_', '') for s in base_symbols)
        return f"NT_{symbol_str}"
    
    def induce_grammar(self, chains, max_iterations=20):
        """
        Hauptmethode: Induziert eine PCFG durch iterative Kompression.
        Führt so lange fort, bis keine Wiederholungen mehr gefunden werden.
        """
        current_chains = [list(chain) for chain in chains]
        iteration = 0
        rule_counter = 1
        
        print("\n" + "=" * 70)
        print("HIERARCHISCHE GRAMMATIKINDUKTION")
        print("=" * 70)
        
        while iteration < max_iterations:
            # Finde die beste Wiederholung
            best_seq = self.find_best_repetition(current_chains)
            
            if best_seq is None:
                print(f"\nKeine weiteren Wiederholungen gefunden nach {iteration} Iterationen.")
                break
            
            # Generiere Namen für neues Nonterminal
            new_nonterminal = self.generate_nonterminal_name(best_seq)
            
            # Stelle sicher, dass der Name einzigartig ist
            while new_nonterminal in self.nonterminals:
                new_nonterminal = f"{new_nonterminal}_{rule_counter}"
                rule_counter += 1
            
            print(f"\nIteration {iteration + 1}:")
            print(f"  Gefundene Wiederholung: {' -> '.join(best_seq)}")
            print(f"  Neues Nonterminal: {new_nonterminal}")
            
            # Speichere die Produktionsregel (als Liste, nicht als Tupel)
            self.rules[new_nonterminal] = [(list(best_seq), 1.0)]  # Temporär mit Wahrscheinlichkeit 1
            self.nonterminals.add(new_nonterminal)
            
            # Zähle, wie oft die Regel in den aktuellen Ketten vorkommt
            occurrence_count = 0
            for chain in current_chains:
                for i in range(len(chain) - len(best_seq) + 1):
                    if tuple(chain[i:i+len(best_seq)]) == best_seq:
                        occurrence_count += 1
            
            self.compression_history.append({
                'iteration': iteration,
                'sequence': best_seq,
                'new_symbol': new_nonterminal,
                'occurrences': occurrence_count
            })
            
            # Komprimiere alle Ketten
            current_chains = self.compress_sequences(current_chains, best_seq, new_nonterminal)
            
            # Zeige Beispiel der komprimierten Kette
            if current_chains and len(current_chains[0]) > 0:
                print(f"  Beispiel (komprimiert): {' -> '.join(str(s) for s in current_chains[0])}")
            
            iteration += 1
        
        # Terminale sind die ursprünglichen Symbole, die nie ersetzt wurden
        all_symbols = set()
        for chain in empirical_chains:
            all_symbols.update(chain)
        
        # Symbole, die nie als Nonterminale eingeführt wurden, sind Terminale
        self.terminals = all_symbols - self.nonterminals
        
        # Berechne Wahrscheinlichkeiten für jede Produktion
        self.calculate_probabilities()
        
        # Bestimme das Startsymbol (das oberste Nonterminal)
        if self.nonterminals:
            # Finde Nonterminale, die in keiner anderen Produktion vorkommen
            used_as_child = set()
            for productions in self.rules.values():
                for prod, _ in productions:
                    for sym in prod:
                        if sym in self.nonterminals:
                            used_as_child.add(sym)
            
            possible_starts = self.nonterminals - used_as_child
            if possible_starts:
                self.start_symbol = sorted(possible_starts)[0]  # Nimm das erste
            else:
                # Fallback: nimm das zuletzt erstellte Nonterminal
                self.start_symbol = sorted(self.nonterminals)[-1]
        
        return current_chains
    
    def calculate_probabilities(self):
        """
        Berechnet Wahrscheinlichkeiten für jede Produktionsregel basierend auf
        den Häufigkeiten in den komprimierten Ketten.
        """
        # Rekonstruiere die komprimierten Ketten mit den Nonterminalen
        current_chains = [list(chain) for chain in empirical_chains]
        
        # Wende alle Kompressionen in der richtigen Reihenfolge an
        for hist_entry in self.compression_history:
            sequence = hist_entry['sequence']
            new_symbol = hist_entry['new_symbol']
            current_chains = self.compress_sequences(current_chains, sequence, new_symbol)
        
        # Zähle für jedes Nonterminal, wie oft es vorkommt und welche Expansionen es hat
        expansion_counts = defaultdict(lambda: defaultdict(int))
        
        for chain in current_chains:
            self.count_expansions_in_chain(chain, expansion_counts)
        
        # Konvertiere zu Wahrscheinlichkeiten
        for nonterminal in self.rules:
            if nonterminal in expansion_counts:
                total = sum(expansion_counts[nonterminal].values())
                if total > 0:
                    # Aktualisiere die Produktionen mit Wahrscheinlichkeiten
                    new_productions = []
                    for expansion_tuple, count in expansion_counts[nonterminal].items():
                        new_productions.append((list(expansion_tuple), count / total))
                    self.rules[nonterminal] = new_productions
    
    def count_expansions_in_chain(self, chain, expansion_counts):
        """
        Zählt die Expansionen in einer einzelnen Kette.
        """
        for i, symbol in enumerate(chain):
            if symbol in self.rules:
                # Suche nach der passenden Expansion für dieses Symbol
                # (In den komprimierten Ketten ist jedes Nonterminal genau eine Expansion)
                # Wir müssen die ursprüngliche Sequenz aus der History finden
                for hist_entry in self.compression_history:
                    if hist_entry['new_symbol'] == symbol:
                        expansion = tuple(hist_entry['sequence'])
                        expansion_counts[symbol][expansion] += 1
                        break

# ============================================================================
# 3. GRAMMATIK-INDUKTION DURCHFÜHREN
# ============================================================================

inducer = GrammarInducer()
compressed_chains = inducer.induce_grammar(empirical_chains)

print("\n" + "=" * 70)
print("INDUZIERTE GRAMMATIK")
print("=" * 70)
print(f"\nTerminale ({len(inducer.terminals)}): {sorted(inducer.terminals)}")
print(f"Nonterminale ({len(inducer.nonterminals)}): {sorted(inducer.nonterminals)}")
if inducer.start_symbol:
    print(f"Startsymbol: {inducer.start_symbol}")

print("\nPRODUKTIONSREGELN (mit Wahrscheinlichkeiten):")
for nonterminal in sorted(inducer.rules.keys()):
    productions = inducer.rules[nonterminal]
    if productions:
        prod_strings = []
        for prod, prob in productions:
            prod_str = ' -> '.join(str(s) for s in prod)
            prod_strings.append(f"{prod_str} [{prob:.3f}]")
        print(f"\n{nonterminal} -> {' | '.join(prod_strings)}")

# ============================================================================
# 4. GENERIERUNG MIT DER INDUZIERTEN PCFG
# ============================================================================

class PCFGGenerator:
    """
    Generiert Ketten mit der induzierten PCFG.
    """
    
    def __init__(self, grammar, terminals, start_symbol):
        self.grammar = grammar
        self.terminals = terminals
        self.start_symbol = start_symbol
        
        # Erstelle schnelle Lookup-Tabellen
        self.production_probs = {}
        for nt, prods in grammar.items():
            if prods:  # Nur wenn es Produktionen gibt
                symbols = [prod for prod, _ in prods]
                probs = [prob for _, prob in prods]
                self.production_probs[nt] = (symbols, probs)
    
    def expand(self, symbol, max_depth=10, current_depth=0):
        """
        Expandiert ein Symbol rekursiv.
        """
        if current_depth >= max_depth:
            return [str(symbol)]  # Fallback, wenn zu tief
        
        # Wenn es ein Terminal ist oder keine Produktionen hat
        if symbol in self.terminals or symbol not in self.production_probs:
            return [str(symbol)]
        
        symbols, probs = self.production_probs[symbol]
        if not symbols:  # Falls keine Symbole vorhanden
            return [str(symbol)]
        
        chosen_idx = np.random.choice(len(symbols), p=probs)
        chosen_symbols = symbols[chosen_idx]
        
        result = []
        for sym in chosen_symbols:
            result.extend(self.expand(sym, max_depth, current_depth + 1))
        
        return result
    
    def generate(self, max_depth=10):
        """
        Generiert eine vollständige Kette.
        """
        if not self.start_symbol:
            return []
        
        chain = self.expand(self.start_symbol, max_depth)
        return chain

# Initialisiere Generator
if inducer.start_symbol:
    generator = PCFGGenerator(inducer.rules, inducer.terminals, inducer.start_symbol)
    
    print("\n" + "=" * 70)
    print("GENERIERTE BEISPIELE MIT DER PCFG")
    print("=" * 70)
    
    for i in range(5):
        generated = generator.generate(max_depth=15)
        print(f"\nKette {i+1} ({len(generated)} Symbole):")
        print(f"  {' -> '.join(generated)}")
else:
    print("\nKein Startsymbol gefunden. Generierung übersprungen.")

# ============================================================================
# 5. VALIDIERUNG: VERGLEICH MIT EMPIRISCHEN DATEN
# ============================================================================

def collect_all_symbols(grammar, terminals, start_symbol, num_samples=1000, max_depth=10):
    """
    Sammelt alle möglichen Terminalableitungen aus der Grammatik.
    """
    if not start_symbol:
        return {}
    
    generator = PCFGGenerator(grammar, terminals, start_symbol)
    all_terminals = []
    
    # Generiere viele Ketten
    for _ in range(num_samples):
        chain = generator.generate(max_depth)
        all_terminals.extend(chain)
    
    if not all_terminals:
        return {}
    
    # Berechne Häufigkeiten
    freq = Counter(all_terminals)
    total = len(all_terminals)
    
    return {sym: count/total for sym, count in freq.items()}

# Sammle alle ursprünglichen Terminale
all_original_terminals = []
for chain in empirical_chains:
    all_original_terminals.extend(chain)

original_freq = Counter(all_original_terminals)
total_original = len(all_original_terminals)
if total_original > 0:
    original_dist = {sym: count/total_original for sym, count in original_freq.items()}
else:
    original_dist = {}

# Generierte Verteilung
if inducer.start_symbol:
    generated_dist = collect_all_symbols(inducer.rules, inducer.terminals, inducer.start_symbol)
    
    print("\n" + "=" * 70)
    print("VALIDIERUNG: TERMINAL-VERTEILUNGEN")
    print("=" * 70)
    
    table_data = []
    all_symbols = sorted(set(original_dist.keys()) | set(generated_dist.keys()))
    for sym in all_symbols:
        orig = original_dist.get(sym, 0)
        gen = generated_dist.get(sym, 0)
        table_data.append([sym, f"{orig:.4f}", f"{gen:.4f}", f"{abs(orig-gen):.4f}"])
    
    print(tabulate(table_data, 
                   headers=["Symbol", "Empirisch", "Generiert", "Differenz"],
                   tablefmt="grid"))
    
    # Berechne Korrelation, wenn möglich
    orig_array = [original_dist.get(sym, 0) for sym in all_symbols]
    gen_array = [generated_dist.get(sym, 0) for sym in all_symbols]
    try:
        corr, p_value = pearsonr(orig_array, gen_array)
        print(f"\nKorrelation: r = {corr:.4f}, p = {p_value:.4f}")
    except:
        print("\nKorrelation konnte nicht berechnet werden.")

# ============================================================================
# 6. HIERARCHIE-VISUALISIERUNG
# ============================================================================

def print_grammar_hierarchy(grammar, start_symbol, indent=0, visited=None):
    """
    Gibt die Hierarchie der Grammatik als Baum aus.
    """
    if visited is None:
        visited = set()
    
    if not start_symbol or start_symbol not in grammar:
        return
    
    if start_symbol in visited:
        print("  " * indent + f"├─ {start_symbol} (Zyklus!)")
        return
    
    visited.add(start_symbol)
    
    productions = grammar[start_symbol]
    print("  " * indent + f"├─ {start_symbol}")
    
    for prod, prob in productions:
        print("  " * (indent + 1) + f"├─ [{prob:.3f}] -> ", end="")
        prod_str = []
        for sym in prod:
            if sym in grammar:
                prod_str.append(sym)
            else:
                prod_str.append(f"'{sym}'")
        print(" ".join(prod_str))
        
        # Rekursiv für Nonterminale
        for sym in prod:
            if sym in grammar:
                print_grammar_hierarchy(grammar, sym, indent + 2, visited.copy())

print("\n" + "=" * 70)
print("GRAMMATIK-HIERARCHIE")
print("=" * 70)
if inducer.start_symbol:
    print_grammar_hierarchy(inducer.rules, inducer.start_symbol)
else:
    print("Kein Startsymbol definiert.")

# ============================================================================
# 7. EXPORT DER GRAMMATIK
# ============================================================================

def export_pcfg(grammar, terminals, start_symbol, filename="induzierte_pcfg.txt"):
    """
    Exportiert die PCFG im lesbaren Format.
    """
    with open(filename, 'w', encoding='utf-8') as f:
        f.write("# Induzierte probabilistische kontextfreie Grammatik (PCFG)\n")
        f.write(f"# Startsymbol: {start_symbol}\n")
        f.write(f"# Terminale: {', '.join(sorted(terminals))}\n")
        f.write("# Nonterminale: " + ', '.join(sorted(grammar.keys())) + "\n\n")
        
        for nonterminal in sorted(grammar.keys()):
            productions = grammar[nonterminal]
            for prod, prob in productions:
                prod_str = ' '.join(str(s) for s in prod)
                f.write(f"{nonterminal} -> {prod_str} [{prob:.3f}]\n")
    
    print(f"\nGrammatik wurde als '{filename}' exportiert.")

export_pcfg(inducer.rules, inducer.terminals, inducer.start_symbol)

print("\n" + "=" * 70)
print("GRAMMATIKINDUKTION ABGESCHLOSSEN")
print("=" * 70)