Inhalt
Aktueller Ordner:
algorithmisch-rekursive-sequenzanalyse-3.0ARS_XAI_PCFG.py
"""
Algorithmisch Rekursive Sequenzanalyse 3.0
GRAMMATIKINDUKTION DURCH HIERARCHISCHE KOMPRESSION
Bildung von Nonterminalzeichen durch Erkennung von Wiederholungen
"""
import numpy as np
from scipy.stats import pearsonr
import matplotlib.pyplot as plt
from tabulate import tabulate
from collections import Counter, defaultdict
import itertools
# ============================================================================
# 1. EMPIRISCHE DATEN: Terminalzeichenketten aus acht Transkripten
# ============================================================================
empirical_chains = [
# Transkript 1: Metzgerei
['KBG', 'VBG', 'KBBd', 'VBBd', 'KBA', 'VBA', 'KBBd', 'VBBd', 'KBA', 'VAA', 'KAA', 'VAV', 'KAV'],
# Transkript 2: Marktplatz (Kirschen)
['VBG', 'KBBd', 'VBBd', 'VAA', 'KAA', 'VBG', 'KBBd', 'VAA', 'KAA'],
# Transkript 3: Fischstand
['KBBd', 'VBBd', 'VAA', 'KAA'],
# Transkript 4: Gemüsestand (ausfuehrlich)
['KBBd', 'VBBd', 'KBA', 'VBA', 'KBBd', 'VBA', 'KAE', 'VAE', 'KAA', 'VAV', 'KAV'],
# Transkript 5: Gemüsestand (mit KAV zu Beginn)
['KAV', 'KBBd', 'VBBd', 'KBBd', 'VAA', 'KAV'],
# Transkript 6: Käseverkaufsstand
['KBG', 'VBG', 'KBBd', 'VBBd', 'KAA'],
# Transkript 7: Bonbonstand
['KBBd', 'VBBd', 'KBA', 'VAA', 'KAA'],
# Transkript 8: Baeckerei
['KBG', 'VBBd', 'KBBd', 'VBA', 'VAA', 'KAA', 'VAV', 'KAV']
]
# ============================================================================
# 2. HIERARCHISCHE GRAMMATIKINDUKTION DURCH KOMPRESSION
# ============================================================================
class GrammarInducer:
"""
Induziert eine PCFG durch hierarchische Kompression von Wiederholungen.
Analog zur Bildung neuer Variablen bei Termumformungen.
"""
def __init__(self):
self.rules = {} # Nonterminal -> Liste von Produktionen mit Wahrscheinlichkeiten
self.terminals = set()
self.nonterminals = set()
self.start_symbol = None
self.compression_history = [] # Speichert die Hierarchie der Kompression
def find_best_repetition(self, chains, min_length=2, max_length=5):
"""
Findet die häufigste wiederholte Sequenz in allen Ketten.
Berücksichtigt Überlappungen und zählt Vorkommen.
"""
sequence_counter = Counter()
for chain in chains:
for length in range(min_length, min(max_length, len(chain) + 1)):
for i in range(len(chain) - length + 1):
seq = tuple(chain[i:i+length])
sequence_counter[seq] += 1
# Filtere Sequenzen, die mehrmals vorkommen
repeated = {seq: count for seq, count in sequence_counter.items()
if count >= 2}
if not repeated:
return None
# Bewerte Sequenzen: (Häufigkeit * Länge) / (Anzahl einzigartiger Symbole)
# Bevorzugt längere, häufige Sequenzen mit weniger einzigartigen Symbolen
best_seq = max(repeated.items(),
key=lambda x: x[1] * len(x[0]) / max(1, len(set(x[0]))))
return best_seq[0]
def compress_sequences(self, chains, sequence, new_nonterminal):
"""
Ersetzt alle Vorkommen der Sequenz durch das neue Nonterminal.
"""
compressed_chains = []
seq_tuple = tuple(sequence)
seq_len = len(sequence)
for chain in chains:
new_chain = []
i = 0
while i < len(chain):
if i <= len(chain) - seq_len and tuple(chain[i:i+seq_len]) == seq_tuple:
new_chain.append(new_nonterminal)
i += seq_len
else:
new_chain.append(chain[i])
i += 1
compressed_chains.append(new_chain)
return compressed_chains
def generate_nonterminal_name(self, base_symbols):
"""
Generiert einen aussagekräftigen Namen für ein neues Nonterminal.
"""
# Extrahiere semantische Informationen aus den Symbolen
prefixes = [s[:2] if isinstance(s, str) else str(s)[:2] for s in base_symbols]
suffixes = [s[2:] if isinstance(s, str) and len(s) > 2 else '' for s in base_symbols]
# Wenn alle Symbole mit K oder V beginnen, behalte das Muster
if all(p in ['KB', 'VB', 'KA', 'VA', 'NT'] or p.startswith('NT') for p in prefixes):
# Behalte das erste und letzte Symbol für die Benennung
first = base_symbols[0]
last = base_symbols[-1]
# Entferne NT_ Präfix für Lesbarkeit, falls vorhanden
first = first.replace('NT_', '') if isinstance(first, str) else str(first)
last = last.replace('NT_', '') if isinstance(last, str) else str(last)
return f"NT_{first}_{last}"
# Generischer Name basierend auf den Symbolen
symbol_str = "_".join(str(s).replace('NT_', '') for s in base_symbols)
return f"NT_{symbol_str}"
def induce_grammar(self, chains, max_iterations=20):
"""
Hauptmethode: Induziert eine PCFG durch iterative Kompression.
Führt so lange fort, bis keine Wiederholungen mehr gefunden werden.
"""
current_chains = [list(chain) for chain in chains]
iteration = 0
rule_counter = 1
print("\n" + "=" * 70)
print("HIERARCHISCHE GRAMMATIKINDUKTION")
print("=" * 70)
while iteration < max_iterations:
# Finde die beste Wiederholung
best_seq = self.find_best_repetition(current_chains)
if best_seq is None:
print(f"\nKeine weiteren Wiederholungen gefunden nach {iteration} Iterationen.")
break
# Generiere Namen für neues Nonterminal
new_nonterminal = self.generate_nonterminal_name(best_seq)
# Stelle sicher, dass der Name einzigartig ist
while new_nonterminal in self.nonterminals:
new_nonterminal = f"{new_nonterminal}_{rule_counter}"
rule_counter += 1
print(f"\nIteration {iteration + 1}:")
print(f" Gefundene Wiederholung: {' -> '.join(best_seq)}")
print(f" Neues Nonterminal: {new_nonterminal}")
# Speichere die Produktionsregel (als Liste, nicht als Tupel)
self.rules[new_nonterminal] = [(list(best_seq), 1.0)] # Temporär mit Wahrscheinlichkeit 1
self.nonterminals.add(new_nonterminal)
# Zähle, wie oft die Regel in den aktuellen Ketten vorkommt
occurrence_count = 0
for chain in current_chains:
for i in range(len(chain) - len(best_seq) + 1):
if tuple(chain[i:i+len(best_seq)]) == best_seq:
occurrence_count += 1
self.compression_history.append({
'iteration': iteration,
'sequence': best_seq,
'new_symbol': new_nonterminal,
'occurrences': occurrence_count
})
# Komprimiere alle Ketten
current_chains = self.compress_sequences(current_chains, best_seq, new_nonterminal)
# Zeige Beispiel der komprimierten Kette
if current_chains and len(current_chains[0]) > 0:
print(f" Beispiel (komprimiert): {' -> '.join(str(s) for s in current_chains[0])}")
iteration += 1
# Terminale sind die ursprünglichen Symbole, die nie ersetzt wurden
all_symbols = set()
for chain in empirical_chains:
all_symbols.update(chain)
# Symbole, die nie als Nonterminale eingeführt wurden, sind Terminale
self.terminals = all_symbols - self.nonterminals
# Berechne Wahrscheinlichkeiten für jede Produktion
self.calculate_probabilities()
# Bestimme das Startsymbol (das oberste Nonterminal)
if self.nonterminals:
# Finde Nonterminale, die in keiner anderen Produktion vorkommen
used_as_child = set()
for productions in self.rules.values():
for prod, _ in productions:
for sym in prod:
if sym in self.nonterminals:
used_as_child.add(sym)
possible_starts = self.nonterminals - used_as_child
if possible_starts:
self.start_symbol = sorted(possible_starts)[0] # Nimm das erste
else:
# Fallback: nimm das zuletzt erstellte Nonterminal
self.start_symbol = sorted(self.nonterminals)[-1]
return current_chains
def calculate_probabilities(self):
"""
Berechnet Wahrscheinlichkeiten für jede Produktionsregel basierend auf
den Häufigkeiten in den komprimierten Ketten.
"""
# Rekonstruiere die komprimierten Ketten mit den Nonterminalen
current_chains = [list(chain) for chain in empirical_chains]
# Wende alle Kompressionen in der richtigen Reihenfolge an
for hist_entry in self.compression_history:
sequence = hist_entry['sequence']
new_symbol = hist_entry['new_symbol']
current_chains = self.compress_sequences(current_chains, sequence, new_symbol)
# Zähle für jedes Nonterminal, wie oft es vorkommt und welche Expansionen es hat
expansion_counts = defaultdict(lambda: defaultdict(int))
for chain in current_chains:
self.count_expansions_in_chain(chain, expansion_counts)
# Konvertiere zu Wahrscheinlichkeiten
for nonterminal in self.rules:
if nonterminal in expansion_counts:
total = sum(expansion_counts[nonterminal].values())
if total > 0:
# Aktualisiere die Produktionen mit Wahrscheinlichkeiten
new_productions = []
for expansion_tuple, count in expansion_counts[nonterminal].items():
new_productions.append((list(expansion_tuple), count / total))
self.rules[nonterminal] = new_productions
def count_expansions_in_chain(self, chain, expansion_counts):
"""
Zählt die Expansionen in einer einzelnen Kette.
"""
for i, symbol in enumerate(chain):
if symbol in self.rules:
# Suche nach der passenden Expansion für dieses Symbol
# (In den komprimierten Ketten ist jedes Nonterminal genau eine Expansion)
# Wir müssen die ursprüngliche Sequenz aus der History finden
for hist_entry in self.compression_history:
if hist_entry['new_symbol'] == symbol:
expansion = tuple(hist_entry['sequence'])
expansion_counts[symbol][expansion] += 1
break
# ============================================================================
# 3. GRAMMATIK-INDUKTION DURCHFÜHREN
# ============================================================================
inducer = GrammarInducer()
compressed_chains = inducer.induce_grammar(empirical_chains)
print("\n" + "=" * 70)
print("INDUZIERTE GRAMMATIK")
print("=" * 70)
print(f"\nTerminale ({len(inducer.terminals)}): {sorted(inducer.terminals)}")
print(f"Nonterminale ({len(inducer.nonterminals)}): {sorted(inducer.nonterminals)}")
if inducer.start_symbol:
print(f"Startsymbol: {inducer.start_symbol}")
print("\nPRODUKTIONSREGELN (mit Wahrscheinlichkeiten):")
for nonterminal in sorted(inducer.rules.keys()):
productions = inducer.rules[nonterminal]
if productions:
prod_strings = []
for prod, prob in productions:
prod_str = ' -> '.join(str(s) for s in prod)
prod_strings.append(f"{prod_str} [{prob:.3f}]")
print(f"\n{nonterminal} -> {' | '.join(prod_strings)}")
# ============================================================================
# 4. GENERIERUNG MIT DER INDUZIERTEN PCFG
# ============================================================================
class PCFGGenerator:
"""
Generiert Ketten mit der induzierten PCFG.
"""
def __init__(self, grammar, terminals, start_symbol):
self.grammar = grammar
self.terminals = terminals
self.start_symbol = start_symbol
# Erstelle schnelle Lookup-Tabellen
self.production_probs = {}
for nt, prods in grammar.items():
if prods: # Nur wenn es Produktionen gibt
symbols = [prod for prod, _ in prods]
probs = [prob for _, prob in prods]
self.production_probs[nt] = (symbols, probs)
def expand(self, symbol, max_depth=10, current_depth=0):
"""
Expandiert ein Symbol rekursiv.
"""
if current_depth >= max_depth:
return [str(symbol)] # Fallback, wenn zu tief
# Wenn es ein Terminal ist oder keine Produktionen hat
if symbol in self.terminals or symbol not in self.production_probs:
return [str(symbol)]
symbols, probs = self.production_probs[symbol]
if not symbols: # Falls keine Symbole vorhanden
return [str(symbol)]
chosen_idx = np.random.choice(len(symbols), p=probs)
chosen_symbols = symbols[chosen_idx]
result = []
for sym in chosen_symbols:
result.extend(self.expand(sym, max_depth, current_depth + 1))
return result
def generate(self, max_depth=10):
"""
Generiert eine vollständige Kette.
"""
if not self.start_symbol:
return []
chain = self.expand(self.start_symbol, max_depth)
return chain
# Initialisiere Generator
if inducer.start_symbol:
generator = PCFGGenerator(inducer.rules, inducer.terminals, inducer.start_symbol)
print("\n" + "=" * 70)
print("GENERIERTE BEISPIELE MIT DER PCFG")
print("=" * 70)
for i in range(5):
generated = generator.generate(max_depth=15)
print(f"\nKette {i+1} ({len(generated)} Symbole):")
print(f" {' -> '.join(generated)}")
else:
print("\nKein Startsymbol gefunden. Generierung übersprungen.")
# ============================================================================
# 5. VALIDIERUNG: VERGLEICH MIT EMPIRISCHEN DATEN
# ============================================================================
def collect_all_symbols(grammar, terminals, start_symbol, num_samples=1000, max_depth=10):
"""
Sammelt alle möglichen Terminalableitungen aus der Grammatik.
"""
if not start_symbol:
return {}
generator = PCFGGenerator(grammar, terminals, start_symbol)
all_terminals = []
# Generiere viele Ketten
for _ in range(num_samples):
chain = generator.generate(max_depth)
all_terminals.extend(chain)
if not all_terminals:
return {}
# Berechne Häufigkeiten
freq = Counter(all_terminals)
total = len(all_terminals)
return {sym: count/total for sym, count in freq.items()}
# Sammle alle ursprünglichen Terminale
all_original_terminals = []
for chain in empirical_chains:
all_original_terminals.extend(chain)
original_freq = Counter(all_original_terminals)
total_original = len(all_original_terminals)
if total_original > 0:
original_dist = {sym: count/total_original for sym, count in original_freq.items()}
else:
original_dist = {}
# Generierte Verteilung
if inducer.start_symbol:
generated_dist = collect_all_symbols(inducer.rules, inducer.terminals, inducer.start_symbol)
print("\n" + "=" * 70)
print("VALIDIERUNG: TERMINAL-VERTEILUNGEN")
print("=" * 70)
table_data = []
all_symbols = sorted(set(original_dist.keys()) | set(generated_dist.keys()))
for sym in all_symbols:
orig = original_dist.get(sym, 0)
gen = generated_dist.get(sym, 0)
table_data.append([sym, f"{orig:.4f}", f"{gen:.4f}", f"{abs(orig-gen):.4f}"])
print(tabulate(table_data,
headers=["Symbol", "Empirisch", "Generiert", "Differenz"],
tablefmt="grid"))
# Berechne Korrelation, wenn möglich
orig_array = [original_dist.get(sym, 0) for sym in all_symbols]
gen_array = [generated_dist.get(sym, 0) for sym in all_symbols]
try:
corr, p_value = pearsonr(orig_array, gen_array)
print(f"\nKorrelation: r = {corr:.4f}, p = {p_value:.4f}")
except:
print("\nKorrelation konnte nicht berechnet werden.")
# ============================================================================
# 6. HIERARCHIE-VISUALISIERUNG
# ============================================================================
def print_grammar_hierarchy(grammar, start_symbol, indent=0, visited=None):
"""
Gibt die Hierarchie der Grammatik als Baum aus.
"""
if visited is None:
visited = set()
if not start_symbol or start_symbol not in grammar:
return
if start_symbol in visited:
print(" " * indent + f"├─ {start_symbol} (Zyklus!)")
return
visited.add(start_symbol)
productions = grammar[start_symbol]
print(" " * indent + f"├─ {start_symbol}")
for prod, prob in productions:
print(" " * (indent + 1) + f"├─ [{prob:.3f}] -> ", end="")
prod_str = []
for sym in prod:
if sym in grammar:
prod_str.append(sym)
else:
prod_str.append(f"'{sym}'")
print(" ".join(prod_str))
# Rekursiv für Nonterminale
for sym in prod:
if sym in grammar:
print_grammar_hierarchy(grammar, sym, indent + 2, visited.copy())
print("\n" + "=" * 70)
print("GRAMMATIK-HIERARCHIE")
print("=" * 70)
if inducer.start_symbol:
print_grammar_hierarchy(inducer.rules, inducer.start_symbol)
else:
print("Kein Startsymbol definiert.")
# ============================================================================
# 7. EXPORT DER GRAMMATIK
# ============================================================================
def export_pcfg(grammar, terminals, start_symbol, filename="induzierte_pcfg.txt"):
"""
Exportiert die PCFG im lesbaren Format.
"""
with open(filename, 'w', encoding='utf-8') as f:
f.write("# Induzierte probabilistische kontextfreie Grammatik (PCFG)\n")
f.write(f"# Startsymbol: {start_symbol}\n")
f.write(f"# Terminale: {', '.join(sorted(terminals))}\n")
f.write("# Nonterminale: " + ', '.join(sorted(grammar.keys())) + "\n\n")
for nonterminal in sorted(grammar.keys()):
productions = grammar[nonterminal]
for prod, prob in productions:
prod_str = ' '.join(str(s) for s in prod)
f.write(f"{nonterminal} -> {prod_str} [{prob:.3f}]\n")
print(f"\nGrammatik wurde als '{filename}' exportiert.")
export_pcfg(inducer.rules, inducer.terminals, inducer.start_symbol)
print("\n" + "=" * 70)
print("GRAMMATIKINDUKTION ABGESCHLOSSEN")
print("=" * 70)