Inhalt
Aktueller Ordner:
ARS_ExplainableAIARSXAI2.py
"""
ARS GUI - Algorithmic Recursive Sequence Analysis with Graphical User Interface
Erweiterte Version mit Petri-Netzen, Bayesschen Netzen und hybrider Integration
Dieses Programm prüft automatisch die Verfügbarkeit aller benötigten Pakete
und installiert fehlende Pakete bei Bedarf nach.
"""
import sys
import subprocess
import importlib
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
# ============================================================================
# PAKETVERWALTUNG - ALTERNATIVE ZU PKG_RESOURCES
# ============================================================================
def check_and_install_packages():
"""Prüft und installiert fehlende Python-Pakete (ohne pkg_resources)"""
REQUIRED_PACKAGES = [
'numpy',
'scipy',
'matplotlib',
'hmmlearn',
'sklearn-crfsuite',
'sentence-transformers',
'networkx',
'torch',
'seaborn',
'tabulate'
]
print("=" * 70)
print("ARS 4.0 - PAKETPRÜFUNG")
print("=" * 70)
missing_packages = []
for package in REQUIRED_PACKAGES:
# Paketnamen für importlib anpassen
import_name = package.replace('-', '_')
if package == 'sklearn-crfsuite':
import_name = 'sklearn_crfsuite'
elif package == 'sentence-transformers':
import_name = 'sentence_transformers'
try:
importlib.import_module(import_name)
print(f"✓ {package} bereits installiert")
except ImportError:
print(f"✗ {package} fehlt")
missing_packages.append(package)
if missing_packages:
print("\nInstalliere fehlende Pakete...")
for package in missing_packages:
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
print(f" ✓ {package} erfolgreich installiert")
except subprocess.CalledProcessError as e:
print(f" ✗ Fehler bei Installation von {package}: {e}")
print(f" Bitte manuell installieren: pip install {package}")
print("\n" + "=" * 70 + "\n")
# Pakete prüfen und installieren
check_and_install_packages()
# ============================================================================
# IMPORTS
# ============================================================================
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, scrolledtext
import numpy as np
from scipy.stats import pearsonr
import matplotlib
matplotlib.use('TkAgg') # Wichtig für Thread-Sicherheit
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from collections import Counter, defaultdict
import threading
import time
import re
import queue
# Optionale Imports mit Fallbacks
try:
import networkx as nx
NETWORKX_AVAILABLE = True
except ImportError:
NETWORKX_AVAILABLE = False
print("Warnung: networkx nicht verfügbar. Graph-Funktionen deaktiviert.")
try:
from hmmlearn import hmm
HMM_AVAILABLE = True
except ImportError:
HMM_AVAILABLE = False
print("Warnung: hmmlearn nicht verfügbar. HMM-Funktionen deaktiviert.")
try:
from sklearn_crfsuite import CRF
CRF_AVAILABLE = True
except ImportError:
CRF_AVAILABLE = False
print("Warnung: sklearn-crfsuite nicht verfügbar. CRF-Funktionen deaktiviert.")
try:
from sentence_transformers import SentenceTransformer
TRANSFORMER_AVAILABLE = True
except ImportError:
TRANSFORMER_AVAILABLE = False
print("Warnung: sentence-transformers nicht verfügbar. Embedding-Funktionen deaktiviert.")
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
print("Warnung: torch nicht verfügbar. GNN-Funktionen deaktiviert.")
try:
import seaborn as sns
SEABORN_AVAILABLE = True
except ImportError:
SEABORN_AVAILABLE = False
print("Warnung: seaborn nicht verfügbar. Visualisierungsfunktionen eingeschränkt.")
# ============================================================================
# THREAD-SICHERE MATPLOTLIB-FUNKTIONEN
# ============================================================================
class PlotThread:
"""Thread-sichere Plot-Ausführung"""
def __init__(self, root):
self.root = root
self.plot_queue = queue.Queue()
self.start_processor()
def start_processor(self):
"""Startet den Plot-Processor"""
self.process()
def process(self):
"""Verarbeitet Plot-Aufträge im Hauptthread"""
try:
while True:
func, args, kwargs = self.plot_queue.get_nowait()
# Im Hauptthread ausführen
self.root.after(0, lambda: self._execute_plot(func, args, kwargs))
except queue.Empty:
pass
finally:
self.root.after(100, self.process)
def _execute_plot(self, func, args, kwargs):
"""Führt Plot-Funktion aus"""
try:
func(*args, **kwargs)
except Exception as e:
print(f"Fehler im Plot: {e}")
def plot(self, func, *args, **kwargs):
"""Fügt einen Plot-Auftrag hinzu"""
self.plot_queue.put((func, args, kwargs))
# ============================================================================
# ARS 2.0 - GRAMMATIK OHNE NONTERMINALE
# ============================================================================
class ARS20:
"""ARS 2.0 - Übergangswahrscheinlichkeiten ohne Nonterminale"""
def __init__(self):
self.chains = []
self.terminals = []
self.start_symbol = None
self.transitions = {}
self.probabilities = {}
self.optimized_probabilities = {}
self.history = []
def load_chains(self, chains, start_symbol=None):
"""Lädt Terminalzeichenketten"""
self.chains = chains
# Alle Terminale aus allen Ketten sammeln
all_terminals = set()
for chain in chains:
for symbol in chain:
all_terminals.add(symbol)
self.terminals = sorted(list(all_terminals))
self.start_symbol = start_symbol if start_symbol else (chains[0][0] if chains else None)
self.transitions = self.count_transitions(chains)
self.probabilities = self.calculate_probabilities(self.transitions)
return True
def count_transitions(self, chains):
"""Zählt Übergänge zwischen Terminalzeichen"""
transitions = {}
for chain in chains:
for i in range(len(chain) - 1):
start, end = chain[i], chain[i + 1]
if start not in transitions:
transitions[start] = {}
if end not in transitions[start]:
transitions[start][end] = 0
transitions[start][end] += 1
return transitions
def calculate_probabilities(self, transitions):
"""Normalisiert Übergangszaehlungen zu Wahrscheinlichkeiten"""
probabilities = {}
for start in transitions:
total = sum(transitions[start].values())
if total > 0:
probabilities[start] = {end: count / total
for end, count in transitions[start].items()}
return probabilities
def print_grammar(self):
"""Gibt die Grammatik aus"""
lines = []
lines.append("=" * 70)
lines.append("ARS 2.0 - ÜBERGANGSWAHRSCHEINLICHKEITEN")
lines.append("=" * 70)
lines.append("")
if self.probabilities:
for start in sorted(self.probabilities.keys()):
trans = self.probabilities[start]
trans_str = ", ".join([f"{end}: {prob:.3f}" for end, prob in sorted(trans.items())])
lines.append(f"{start} -> {trans_str}")
else:
lines.append("Keine Übergänge gefunden.")
lines.append(f"\nTerminalzeichen ({len(self.terminals)}): {self.terminals}")
lines.append(f"Startzeichen: {self.start_symbol}")
return "\n".join(lines)
def generate_chain(self, start_symbol=None, max_length=20):
"""Generiert eine Kette basierend auf Wahrscheinlichkeiten"""
if not self.optimized_probabilities:
probs = self.probabilities
else:
probs = self.optimized_probabilities
start = start_symbol if start_symbol else self.start_symbol
if not start or start not in probs:
return []
chain = [start]
current = start
for _ in range(max_length - 1):
if current not in probs:
break
next_symbols = list(probs[current].keys())
if not next_symbols:
break
probs_list = list(probs[current].values())
if not probs_list:
break
try:
next_symbol = np.random.choice(next_symbols, p=probs_list)
chain.append(next_symbol)
current = next_symbol
except:
break
if current not in probs:
break
return chain
def compute_frequencies(self, chains):
"""Berechnet relative Häufigkeiten der Terminalzeichen"""
if not self.terminals:
return np.array([])
freq_array = np.zeros(len(self.terminals))
term_index = {term: i for i, term in enumerate(self.terminals)}
for chain in chains:
for symbol in chain:
if symbol in term_index:
freq_array[term_index[symbol]] += 1
total = freq_array.sum()
if total > 0:
freq_array /= total
return freq_array
def optimize(self, max_iterations=500, tolerance=0.005, target_correlation=0.9,
progress_callback=None):
"""Optimiert die Grammatik durch iterativen Vergleich"""
# Initiale Wahrscheinlichkeiten
probs = {}
for start, p in self.probabilities.items():
probs[start] = p.copy()
empirical_freqs = self.compute_frequencies(self.chains)
best_correlation = 0
best_probabilities = None
history = []
for iteration in range(max_iterations):
# Generiere Ketten
generated = [self.generate_chain(max_length=20) for _ in range(8)]
generated = [g for g in generated if g] # Entferne leere Ketten
if not generated:
continue
gen_freqs = self.compute_frequencies(generated)
# Korrelation
try:
if len(empirical_freqs) == len(gen_freqs) and len(empirical_freqs) > 1:
corr, p_val = pearsonr(empirical_freqs, gen_freqs)
else:
corr, p_val = 0, 1
except:
corr, p_val = 0, 1
history.append((iteration, corr, p_val))
# Progress update
if progress_callback and iteration % 10 == 0:
progress_callback(iteration, max_iterations, corr, p_val)
# Abbruchkriterium
if corr >= target_correlation and p_val < 0.05:
best_correlation = corr
best_probabilities = {s: p.copy() for s, p in probs.items()}
break
# Anpassung
for start in probs:
for end in list(probs[start].keys()):
if end in self.terminals:
idx = self.terminals.index(end)
if idx < len(empirical_freqs) and idx < len(gen_freqs):
emp_prob = empirical_freqs[idx]
gen_prob = gen_freqs[idx]
error = emp_prob - gen_prob
probs[start][end] += error * tolerance
probs[start][end] = max(0.01, min(0.99, probs[start][end]))
# Renormalisierung
for start in probs:
total = sum(probs[start].values())
if total > 0:
probs[start] = {end: p/total for end, p in probs[start].items()}
if best_probabilities is None and history:
best_idx = max(range(len(history)), key=lambda i: history[i][1])
best_correlation = history[best_idx][1]
best_probabilities = self.probabilities
self.optimized_probabilities = best_probabilities
self.history = history
return best_probabilities, best_correlation, history
# ============================================================================
# ARS 3.0 - GRAMMATIK MIT NONTERMINALEN (HIERARCHISCHE KOMPRESSION)
# VOLLSTÄNDIG KORRIGIERTE VERSION
# ============================================================================
class MethodologicalReflection:
"""
Dokumentiert die interpretativen Entscheidungen im Induktionsprozess.
"""
def __init__(self):
self.interpretation_log = []
self.sequence_meaning_mapping = {}
def log_interpretation(self, sequence, new_nonterminal, rationale):
"""Dokumentiert eine Interpretationsentscheidung"""
self.interpretation_log.append({
'sequence': sequence,
'new_nonterminal': new_nonterminal,
'rationale': rationale,
'timestamp': len(self.interpretation_log)
})
# Bedeutung der Sequenz explizieren
aktionen = [self._interpretiere_symbol(s) for s in sequence if isinstance(s, str)]
self.sequence_meaning_mapping[tuple(sequence)] = {
'bedeutung': ' → '.join(aktionen),
'typ': self._klassifiziere_sequenz(sequence)
}
def _interpretiere_symbol(self, symbol):
"""Gibt die qualitative Bedeutung eines Terminalzeichens zurück"""
bedeutungen = {
'KBG': 'Kunden-Gruß',
'VBG': 'Verkäufer-Gruß',
'KBBd': 'Kunden-Bedarf (konkret)',
'VBBd': 'Verkäufer-Nachfrage',
'KBA': 'Kunden-Antwort',
'VBA': 'Verkäufer-Reaktion',
'KAE': 'Kunden-Erkundigung',
'VAE': 'Verkäufer-Auskunft',
'KAA': 'Kunden-Abschluss',
'VAA': 'Verkäufer-Abschluss',
'KAV': 'Kunden-Verabschiedung',
'VAV': 'Verkäufer-Verabschiedung',
'KNG': 'Kunden-Gruß (Variante)',
'VBG.VBBd': 'Verkäufer-Aktion (kombiniert)'
}
return bedeutungen.get(symbol, str(symbol))
def _klassifiziere_sequenz(self, sequence):
"""Klassifiziert den Typ der Interaktionssequenz"""
seq_str = ' '.join([str(s) for s in sequence])
if 'KBBd' in seq_str and 'VBBd' in seq_str:
return 'Bedarfsaushandlung'
elif 'KAE' in seq_str or 'VAE' in seq_str:
return 'Informationsaustausch'
elif 'KAA' in seq_str and 'VAA' in seq_str:
return 'Transaktionsabschluss'
else:
return 'Interaktionssequenz'
def print_summary(self):
"""Gibt eine methodologische Zusammenfassung aus"""
print("\n" + "=" * 70)
print("METHODOLOGISCHE REFLEXION")
print("=" * 70)
print("\nDokumentierte Interpretationsentscheidungen:")
for log in self.interpretation_log:
print(f"\n[Interpretation {log['timestamp']+1}]")
seq_str = ' → '.join([str(s) for s in log['sequence']])
print(f" Sequenz: {seq_str}")
print(f" → Nonterminal: {log['new_nonterminal']}")
print(f" Begründung: {log['rationale']}")
if tuple(log['sequence']) in self.sequence_meaning_mapping:
mapping = self.sequence_meaning_mapping[tuple(log['sequence'])]
print(f" Bedeutung: {mapping['bedeutung']}")
print(f" Sequenztyp: {mapping['typ']}")
class GrammarInducer:
"""
Induziert eine PCFG durch hierarchische Kompression von Wiederholungen.
Wiederholt den Vorgang, bis nur noch ein Startsymbol übrig bleibt.
"""
def __init__(self):
self.rules = {} # Nonterminal -> Produktionen
self.terminals = set()
self.nonterminals = set()
self.start_symbol = None
self.user_start_symbol = None # Vom Benutzer definiertes Startzeichen
self.compression_history = []
self.reflection = MethodologicalReflection()
self.chains = []
self.iteration_count = 0
self.hierarchy_levels = {} # Speichert die Hierarchieebene jedes Nonterminals
def load_chains(self, chains, user_start_symbol=None):
"""Lädt Terminalzeichenketten und optional ein benutzerdefiniertes Startzeichen"""
self.chains = [list(chain) for chain in chains]
self.user_start_symbol = user_start_symbol
# Alle ursprünglichen Terminale sammeln
all_symbols = set()
for chain in chains:
for symbol in chain:
all_symbols.add(symbol)
self.terminals = all_symbols
return True
def find_best_repetition(self, chains, min_length=2, max_length=5):
"""
Findet die beste wiederholte Sequenz in allen Ketten.
Berücksichtigt Häufigkeit, Länge und Komplexität.
"""
sequence_counter = Counter()
for chain in chains:
max_len = min(max_length, len(chain))
for length in range(min_length, max_len + 1):
for i in range(len(chain) - length + 1):
seq = tuple(chain[i:i+length])
sequence_counter[seq] += 1
# Nur Sequenzen mit mindestens 2 Vorkommen
repeated = {seq: count for seq, count in sequence_counter.items()
if count >= 2}
if not repeated:
return None
# Bewertung: (Häufigkeit * Länge) / Anzahl einzigartiger Symbole
# Bevorzugt längere, häufigere Muster mit weniger Varianz
best_seq = max(repeated.items(),
key=lambda x: x[1] * len(x[0]) / max(1, len(set(x[0]))))
return best_seq[0]
def generate_nonterminal_name(self, sequence):
"""
Generiert einen aussagekräftigen Namen für ein neues Nonterminal.
"""
if all(isinstance(s, str) and s.startswith(('K', 'V')) for s in sequence):
# Extrahiere erste und letzte Komponente für die Benennung
first = sequence[0]
last = sequence[-1]
# Bestimme den Typ basierend auf den Symbolen
seq_str = ' '.join([str(s) for s in sequence])
if 'KBBd' in seq_str and 'VBBd' in seq_str:
typ = "BEDARFSKLAERUNG"
elif ('VAA' in seq_str and 'KAA' in seq_str) or ('VAA' in seq_str and 'KAV' in seq_str):
typ = "ZAHLUNGSVORGANG"
elif 'KAE' in seq_str or 'VAE' in seq_str:
typ = "INFORMATIONSAUSTAUSCH"
elif 'KBG' in seq_str and 'VBG' in seq_str:
typ = "BEGRUESSUNG"
elif 'VAV' in seq_str and 'KAV' in seq_str:
typ = "VERABSCHIEDUNG"
else:
typ = "SEQUENZ"
return f"NT_{typ}_{first}_{last}"
else:
# Für gemischte Sequenzen mit bereits vorhandenen Nonterminalen
return f"NT_{'_'.join(str(s) for s in sequence)}"
def _describe_sequence(self, sequence):
"""Erzeugt eine semantische Beschreibung der Sequenz"""
if len(sequence) == 2:
if all(isinstance(s, str) and len(s) <= 4 for s in sequence):
return f"{self.reflection._interpretiere_symbol(sequence[0])} → {self.reflection._interpretiere_symbol(sequence[1])}"
else:
return f"{sequence[0]} → {sequence[1]}"
else:
return f"Sequenz mit {len(sequence)} Schritten"
def compress_sequences(self, chains, sequence, new_nonterminal):
"""
Ersetzt alle Vorkommen der Sequenz durch das neue Nonterminal.
"""
compressed = []
seq_tuple = tuple(sequence)
seq_len = len(sequence)
for chain in chains:
new_chain = []
i = 0
while i < len(chain):
if i <= len(chain) - seq_len and tuple(chain[i:i+seq_len]) == seq_tuple:
new_chain.append(new_nonterminal)
i += seq_len
else:
new_chain.append(chain[i])
i += 1
compressed.append(new_chain)
return compressed
def all_chains_identical(self, chains):
"""Prüft, ob alle Ketten identisch sind (nur ein Symbol)"""
if not chains:
return False
first = chains[0]
return all(len(chain) == 1 and chain[0] == first[0] for chain in chains)
def find_top_level_nonterminal(self):
"""
Findet das oberste Nonterminal in der Hierarchie.
Das ist dasjenige, das niemals als Teil einer anderen Produktion vorkommt.
"""
if not self.rules:
return None
# Sammle alle Symbole, die in Produktionen vorkommen
symbols_in_productions = set()
for nt, productions in self.rules.items():
for prod, _ in productions:
for sym in prod:
symbols_in_productions.add(sym)
# Nonterminale, die niemals als Teil einer anderen Produktion vorkommen
# sind die obersten in der Hierarchie
top_level = [nt for nt in self.rules if nt not in symbols_in_productions]
if top_level:
# Wenn es mehrere gibt, nimm das mit der höchsten Hierarchieebene (späteste Iteration)
if len(top_level) > 1:
top_level.sort(key=lambda nt: self.hierarchy_levels.get(nt, 0), reverse=True)
selected = top_level[0]
return selected
# Fallback: nimm das Nonterminal mit der höchsten Hierarchieebene
if self.hierarchy_levels:
selected = max(self.hierarchy_levels.items(), key=lambda x: x[1])[0]
return selected
# Letzter Fallback: nimm das erste Nonterminal
return list(self.rules.keys())[0] if self.rules else None
def induce_grammar(self, max_iterations=50, progress_callback=None):
"""
Induziert Grammatik durch hierarchische Kompression.
Wiederholt den Vorgang, bis nur noch ein Startsymbol übrig bleibt
oder keine weiteren Wiederholungen gefunden werden.
"""
current_chains = [list(chain) for chain in self.chains]
iteration = 0
rule_counter = 1
self.rules = {}
self.nonterminals = set()
self.compression_history = []
self.iteration_count = 0
self.hierarchy_levels = {}
print("\n" + "=" * 70)
print("HIERARCHISCHE GRAMMATIKINDUKTION")
print("=" * 70)
print("\nDer Induktionsprozess wird als EXPLIKATION verstanden:")
print("- Jedes neue Nonterminal repräsentiert eine INTERPRETATIVE KATEGORIE")
print("- Die Benennung expliziert die qualitative Bedeutung")
print("- Der Prozess wird wiederholt, bis nur noch EIN Symbol übrig bleibt")
print("- Dieses Symbol wird zum STARTSYMBOL der Grammatik\n")
while iteration < max_iterations:
best_seq = self.find_best_repetition(current_chains)
if best_seq is None:
print(f"\nKeine weiteren Wiederholungen nach {iteration} Iterationen gefunden.")
break
# Generiere interpretativen Namen
new_nonterminal = self.generate_nonterminal_name(best_seq)
beschreibung = self._describe_sequence(best_seq)
# Stelle Einzigartigkeit sicher
base_name = new_nonterminal
while new_nonterminal in self.nonterminals:
new_nonterminal = f"{base_name}_{rule_counter}"
rule_counter += 1
# Dokumentiere die interpretative Entscheidung
rationale = f"Erkanntes Dialogmuster: {beschreibung}"
self.reflection.log_interpretation(best_seq, new_nonterminal, rationale)
seq_str = ' → '.join([str(s) for s in best_seq])
print(f"\nIteration {iteration + 1}:")
print(f" Erkanntes Muster: {seq_str}")
print(f" Interpretation: {beschreibung}")
print(f" → Neue Kategorie: {new_nonterminal}")
# Speichere die Regel (vorerst ohne Wahrscheinlichkeit)
self.rules[new_nonterminal] = [(list(best_seq), 1.0)]
self.nonterminals.add(new_nonterminal)
self.hierarchy_levels[new_nonterminal] = iteration # Speichere die Hierarchieebene
# Vorkommen zählen für die Dokumentation
occurrences = 0
for chain in current_chains:
for i in range(len(chain) - len(best_seq) + 1):
if tuple(chain[i:i+len(best_seq)]) == best_seq:
occurrences += 1
self.compression_history.append({
'iteration': iteration,
'sequence': best_seq,
'new_symbol': new_nonterminal,
'occurrences': occurrences
})
# Komprimiere alle Ketten
current_chains = self.compress_sequences(current_chains, best_seq, new_nonterminal)
# Zeige ein Beispiel der komprimierten Kette
if current_chains and current_chains[0]:
example = ' → '.join([str(s) for s in current_chains[0][:10]])
print(f" Beispiel (komprimiert): {example}...")
iteration += 1
self.iteration_count = iteration
# Prüfe auf vollständige Kompression - alle Ketten sind zu einem Symbol komprimiert
if self.all_chains_identical(current_chains):
# Alle Ketten sind zu einem Symbol komprimiert
if current_chains and current_chains[0]:
unique_symbol = current_chains[0][0]
# Prüfe, ob das benutzerdefinierte Startzeichen verwendet werden kann
if self.user_start_symbol and self.user_start_symbol in self.rules:
self.start_symbol = self.user_start_symbol
print(f"\nINDUKTION ABGESCHLOSSEN: Benutzerdefiniertes Startsymbol = {self.start_symbol}")
elif unique_symbol in self.rules:
self.start_symbol = unique_symbol
print(f"\nINDUKTION ABGESCHLOSSEN: Komprimiertes Startsymbol = {self.start_symbol}")
else:
# Finde das oberste Nonterminal
self.start_symbol = self.find_top_level_nonterminal()
print(f"\nINDUKTION ABGESCHLOSSEN: Oberstes Nonterminal als Startsymbol = {self.start_symbol}")
break
# Falls keine vollständige Kompression erreicht wurde, bestimme ein Startsymbol
if self.start_symbol is None:
if self.user_start_symbol and self.user_start_symbol in self.rules:
self.start_symbol = self.user_start_symbol
print(f"\nKeine vollständige Kompression erreicht. Benutzerdefiniertes Startsymbol: {self.start_symbol}")
elif self.rules:
# Finde das oberste Nonterminal in der Hierarchie
self.start_symbol = self.find_top_level_nonterminal()
print(f"\nKeine vollständige Kompression erreicht. Oberstes Nonterminal als Startsymbol: {self.start_symbol}")
else:
print("\nWARNUNG: Keine Grammatik induziert!")
return current_chains
# Terminale sind die ursprünglichen Symbole, die nie ersetzt wurden
all_symbols = set()
for chain in self.chains:
for sym in chain:
all_symbols.add(sym)
# Symbole, die nie als Nonterminale eingeführt wurden, sind Terminale
self.terminals = all_symbols - self.nonterminals
# Wahrscheinlichkeiten berechnen
self._calculate_probabilities()
print(f"\nStartsymbol: {self.start_symbol}")
return current_chains
def _calculate_probabilities(self):
"""Berechnet Wahrscheinlichkeiten für jede Produktion basierend auf Häufigkeiten"""
# Zähle, wie oft jedes Nonterminal expandiert wird
expansion_counts = defaultdict(Counter)
# Rekonstruiere die Expansionshierarchie aus den Originalketten
for chain in self.chains:
self._count_expansions(chain, expansion_counts)
# Konvertiere zu Wahrscheinlichkeiten
for nonterminal in self.rules:
if nonterminal in expansion_counts:
total = sum(expansion_counts[nonterminal].values())
if total > 0:
productions = []
for expansion, count in expansion_counts[nonterminal].items():
productions.append((list(expansion), count / total))
# Sortiere nach Wahrscheinlichkeit (absteigend)
productions.sort(key=lambda x: x[1], reverse=True)
self.rules[nonterminal] = productions
# Falls keine Vorkommen gefunden wurden, behalte die initiale Produktion mit Wahrscheinlichkeit 1.0
def _count_expansions(self, sequence, expansion_counts):
"""Rekursive Hilfsfunktion zum Zählen der Expansionen"""
i = 0
while i < len(sequence):
symbol = sequence[i]
# Wenn das Symbol ein Nonterminal ist, zähle seine Expansion
if symbol in self.rules:
# Finde die längste passende Expansion
found = False
for expansion, _ in self.rules[symbol]:
exp_len = len(expansion)
if i + exp_len <= len(sequence) and sequence[i:i+exp_len] == expansion:
expansion_counts[symbol][tuple(expansion)] += 1
self._count_expansions(expansion, expansion_counts)
i += exp_len
found = True
break
if not found:
i += 1
else:
i += 1
def print_grammar(self):
"""Gibt die vollständige induzierte Grammatik aus"""
lines = []
lines.append("\n" + "=" * 70)
lines.append("INDUZIERTE GRAMMATIK")
lines.append("=" * 70)
lines.append(f"\nTerminale ({len(self.terminals)}): {sorted(self.terminals)}")
lines.append(f"Nonterminale ({len(self.nonterminals)}): {sorted(self.nonterminals)}")
lines.append(f"Startsymbol: {self.start_symbol}")
lines.append(f"Iterationen: {self.iteration_count}")
lines.append("\nPRODUKTIONSREGELN (mit Wahrscheinlichkeiten):")
for nonterminal in sorted(self.rules.keys()):
productions = self.rules[nonterminal]
if productions:
prod_str = " | ".join([f"{' → '.join(prod)} [{prob:.3f}]"
for prod, prob in productions])
lines.append(f"\n{nonterminal} → {prod_str}")
# Zeige die Kompressionshierarchie
if self.compression_history:
lines.append("\n\nKOMPRESSIONSHISTORIE:")
for entry in self.compression_history:
seq_str = ' → '.join([str(s) for s in entry['sequence']])
lines.append(f" Iteration {entry['iteration']+1}: {seq_str} → {entry['new_symbol']} ({entry['occurrences']} Vorkommen)")
return "\n".join(lines)
def generate_chain(self, start_symbol=None, max_depth=20):
"""
Generiert eine neue Kette mit der induzierten Grammatik.
Beginnt beim Startsymbol und expandiert rekursiv.
"""
if not start_symbol:
start_symbol = self.start_symbol
if not start_symbol:
return []
# Stelle sicher, dass das Startsymbol in den Regeln existiert
if start_symbol not in self.rules:
if self.rules:
# Versuche das oberste Nonterminal
start_symbol = self.find_top_level_nonterminal()
else:
return []
# Produktionswahrscheinlichkeiten vorbereiten
prod_probs = {}
for nt, prods in self.rules.items():
symbols = [p for p, _ in prods]
probs = [prob for _, prob in prods]
if symbols and probs:
total = sum(probs)
if total > 0:
probs = [p/total for p in probs]
prod_probs[nt] = (symbols, probs)
def expand(symbol, depth=0):
"""Rekursive Expansion eines Symbols"""
if depth >= max_depth:
return [str(symbol)] # Schutz vor unendlicher Rekursion
# Wenn es ein Terminal ist, gib es zurück
if symbol in self.terminals:
return [str(symbol)]
# Wenn es ein Nonterminal mit Produktionen ist
if symbol in prod_probs:
symbols, probs = prod_probs[symbol]
if not symbols:
return [str(symbol)]
try:
# Wähle eine Produktion basierend auf Wahrscheinlichkeiten
chosen_idx = np.random.choice(len(symbols), p=probs)
chosen = symbols[chosen_idx]
except Exception:
# Fallback bei Fehlern
chosen = symbols[0] if symbols else []
# Expandiere jedes Symbol der gewählten Produktion
result = []
for sym in chosen:
result.extend(expand(sym, depth + 1))
return result
# Fallback
return [str(symbol)]
return expand(start_symbol)
def get_compression_tree(self, symbol=None, depth=0):
"""
Gibt den Kompressionsbaum als String zurück (für Debugging/Visualisierung).
"""
if symbol is None:
symbol = self.start_symbol
if symbol is None:
return "Kein Startsymbol definiert"
if symbol in self.terminals:
return " " * depth + f"└─ {symbol} (Terminal)"
lines = []
lines.append(" " * depth + f"├─ {symbol}")
if symbol in self.rules:
productions = self.rules[symbol]
for i, (prod, prob) in enumerate(productions):
prefix = " " * (depth + 1) + "├─ " if i < len(productions) - 1 else " " * (depth + 1) + "└─ "
lines.append(prefix + f"[{prob:.3f}] ->")
for sym in prod:
if sym in self.rules:
# Rekursiv für Nonterminale
subtree = self.get_compression_tree(sym, depth + 2)
# Die erste Zeile des Subtree anpassen
subtree_lines = subtree.split('\n')
for j, line in enumerate(subtree_lines):
if j == 0:
lines.append(line)
else:
lines.append(line)
else:
lines.append(" " * (depth + 2) + f"└─ {sym}")
return "\n".join(lines)
# ============================================================================
# PETRI-NETZE (ARS 4.0 - SZENARIO A)
# ============================================================================
if NETWORKX_AVAILABLE:
class ARSPetriNet:
"""
Petri-Netz-Modell für ARS 4.0
"""
def __init__(self, name="ARS_PetriNet"):
self.name = name
self.places = {} # Stellen: name -> Place-Objekt
self.transitions = {} # Transitionen: name -> Transition-Objekt
self.arcs = [] # Kanten: (source, target, weight)
self.tokens = {} # Marken: place_name -> Anzahl
self.hierarchy = {} # Hierarchie: transition_name -> subnet
# Statistik
self.firing_history = []
self.reached_markings = set()
def add_place(self, name, initial_tokens=0, place_type="normal"):
"""
Fügt eine Stelle hinzu
place_type: "normal", "resource", "phase", "customer", "seller"
"""
self.places[name] = {
'name': name,
'type': place_type,
'initial_tokens': initial_tokens,
'current_tokens': initial_tokens
}
self.tokens[name] = initial_tokens
def add_transition(self, name, transition_type="speech_act",
guard=None, subnet=None):
"""
Fügt eine Transition hinzu
transition_type: "speech_act", "abstract", "silent"
guard: Bedingungsfunktion (optional)
subnet: Subnetz für hierarchische Transitionen
"""
self.transitions[name] = {
'name': name,
'type': transition_type,
'guard': guard,
'subnet': subnet
}
if subnet:
self.hierarchy[name] = subnet
def add_arc(self, source, target, weight=1):
"""
Fügt eine Kante hinzu (source -> target)
source/target können Stellen oder Transitionen sein
"""
self.arcs.append({
'source': source,
'target': target,
'weight': weight
})
def get_preset(self, transition):
"""Gibt die Vorstellen einer Transition zurück"""
preset = {}
for arc in self.arcs:
if arc['target'] == transition and arc['source'] in self.places:
preset[arc['source']] = arc['weight']
return preset
def get_postset(self, transition):
"""Gibt die Nachstellen einer Transition zurück"""
postset = {}
for arc in self.arcs:
if arc['source'] == transition and arc['target'] in self.places:
postset[arc['target']] = arc['weight']
return postset
def is_enabled(self, transition):
"""Prüft, ob eine Transition aktiviert ist"""
if transition not in self.transitions:
return False
# Prüfe Vorstellen
preset = self.get_preset(transition)
for place, weight in preset.items():
if self.tokens.get(place, 0) < weight:
return False
# Prüfe Guard-Bedingung
trans_data = self.transitions[transition]
if trans_data['guard'] and not trans_data['guard'](self):
return False
return True
def fire(self, transition):
"""Schaltet eine Transition"""
if not self.is_enabled(transition):
return False
# Entferne Token von Vorstellen
preset = self.get_preset(transition)
for place, weight in preset.items():
self.tokens[place] -= weight
# Füge Token zu Nachstellen hinzu
postset = self.get_postset(transition)
for place, weight in postset.items():
self.tokens[place] = self.tokens.get(place, 0) + weight
# Protokolliere Schaltvorgang
self.firing_history.append({
'transition': transition,
'marking': self.get_marking_copy()
})
# Speichere erreichte Markierung
self.reached_markings.add(self.get_marking_tuple())
return True
def get_marking_copy(self):
"""Gibt eine Kopie der aktuellen Markierung zurück"""
return self.tokens.copy()
def get_marking_tuple(self):
"""Gibt die Markierung als sortiertes Tupel zurück (für Hash-Set)"""
return tuple(sorted([(p, self.tokens[p]) for p in self.places]))
def reset(self):
"""Setzt das Netz in den Anfangszustand zurück"""
for place_name, place_data in self.places.items():
self.tokens[place_name] = place_data['initial_tokens']
self.firing_history = []
def simulate(self, transition_sequence):
"""
Simuliert eine Sequenz von Transitionen
Gibt Erfolg und letzte Markierung zurück
"""
self.reset()
successful = []
for t in transition_sequence:
if self.is_enabled(t):
self.fire(t)
successful.append(t)
else:
break
return successful, self.get_marking_copy()
class PetriNetBuilder:
"""
Baut Petri-Netze aus ARS-Daten
"""
def __init__(self, terminal_chains, grammar_rules=None):
self.chains = terminal_chains
self.grammar = grammar_rules
self.petri_net = None
def build_basic_net(self):
"""Erstellt ein einfaches Petri-Netz ohne Ressourcen"""
self.petri_net = ARSPetriNet("ARS_PetriNet_Basic")
# Alle Terminalzeichen als Transitionen
all_symbols = set()
for chain in self.chains:
for sym in chain:
all_symbols.add(sym)
# Stellen für Sequenzpositionen
self.petri_net.add_place("p_start", initial_tokens=1)
self.petri_net.add_place("p_end", initial_tokens=0)
for i, sym in enumerate(sorted(all_symbols)):
self.petri_net.add_place(f"p_{sym}_ready", initial_tokens=0)
self.petri_net.add_transition(f"t_{sym}")
# Verbindungen
if i == 0:
self.petri_net.add_arc("p_start", f"t_{sym}")
self.petri_net.add_arc(f"t_{sym}", f"p_{sym}_ready")
return self.petri_net
def build_resource_net(self):
"""Erstellt ein Petri-Netz mit Ressourcen"""
self.petri_net = ARSPetriNet("ARS_PetriNet_Resource")
# Kunde und Verkäufer als Ressourcen
self.petri_net.add_place("p_customer_present", initial_tokens=1, place_type="customer")
self.petri_net.add_place("p_customer_ready", initial_tokens=1, place_type="customer")
self.petri_net.add_place("p_seller_ready", initial_tokens=1, place_type="seller")
# Waren und Geld
self.petri_net.add_place("p_goods_available", initial_tokens=10, place_type="resource")
self.petri_net.add_place("p_goods_selected", initial_tokens=0, place_type="resource")
self.petri_net.add_place("p_money_customer", initial_tokens=20, place_type="resource")
self.petri_net.add_place("p_money_register", initial_tokens=0, place_type="resource")
# Phasen
phases = ["Greeting", "Need", "Consult", "Completion", "Farewell"]
for phase in phases:
self.petri_net.add_place(f"p_phase_{phase}", initial_tokens=0, place_type="phase")
self.petri_net.add_place("p_phase_start", initial_tokens=1, place_type="phase")
# Alle Terminalzeichen als Transitionen mit Ressourcen-Anbindung
all_symbols = set()
for chain in self.chains:
for sym in chain:
all_symbols.add(sym)
for sym in sorted(all_symbols):
self.petri_net.add_transition(f"t_{sym}")
# Grundlegende Verbindungen
if sym.startswith('K'):
self.petri_net.add_arc("p_customer_ready", f"t_{sym}")
self.petri_net.add_arc(f"t_{sym}", "p_customer_ready")
else:
self.petri_net.add_arc("p_seller_ready", f"t_{sym}")
self.petri_net.add_arc(f"t_{sym}", "p_seller_ready")
# Spezielle Verbindungen je nach Symboltyp
if sym.endswith('A'): # Abschluss-Symbole
self.petri_net.add_arc("p_goods_selected", f"t_{sym}")
self.petri_net.add_arc("p_money_customer", f"t_{sym}")
self.petri_net.add_arc(f"t_{sym}", "p_goods_available")
self.petri_net.add_arc(f"t_{sym}", "p_money_register")
return self.petri_net
def simulate_chain(self, chain):
"""Simuliert eine Kette im Petri-Netz"""
if not self.petri_net:
self.build_basic_net()
self.petri_net.reset()
results = []
for sym in chain:
trans_name = f"t_{sym}"
if trans_name in self.petri_net.transitions:
enabled = self.petri_net.is_enabled(trans_name)
if enabled:
self.petri_net.fire(trans_name)
results.append((sym, True, "enabled"))
else:
results.append((sym, False, "not enabled"))
else:
results.append((sym, False, "no transition"))
return results, self.petri_net.get_marking_copy()
else:
class ARSPetriNet:
def __init__(self, *args, **kwargs):
raise ImportError("networkx nicht installiert")
class PetriNetBuilder:
def __init__(self, *args, **kwargs):
raise ImportError("networkx nicht installiert")
# ============================================================================
# BAYESSCHE NETZE (ARS 4.0 - SZENARIO B)
# ============================================================================
if HMM_AVAILABLE:
class ARSHiddenMarkovModel:
"""
Hidden-Markov-Modell für ARS 4.0
Korrigierte Version für hmmlearn
"""
def __init__(self, n_states=5):
self.n_states = n_states
self.model = None
self.symbol_to_idx = {}
self.idx_to_symbol = {}
self.state_names = {
0: "Greeting",
1: "Need Determination",
2: "Consultation",
3: "Completion",
4: "Farewell"
}
self.n_features = None
def prepare_data(self, chains):
"""Bereitet Daten für HMM vor"""
# Symbol-Mapping erstellen
all_symbols = set()
for chain in chains:
for sym in chain:
all_symbols.add(sym)
# Stelle sicher, dass alle Symbole Strings sind und keine None-Werte
all_symbols = {str(s) for s in all_symbols if s is not None}
self.symbol_to_idx = {sym: i for i, sym in enumerate(sorted(all_symbols))}
self.idx_to_symbol = {i: sym for sym, i in self.symbol_to_idx.items()}
self.n_features = len(all_symbols)
# Daten in Sequenzen konvertieren
X = []
lengths = []
for chain in chains:
# Stelle sicher, dass jedes Symbol im chain existiert
seq = []
for sym in chain:
if sym in self.symbol_to_idx:
seq.append(self.symbol_to_idx[sym])
else:
# Fallback: überspringe unbekannte Symbole
continue
if seq: # Nur nicht-leere Sequenzen hinzufügen
X.extend(seq)
lengths.append(len(seq))
if not X: # Falls keine Daten vorhanden
return np.array([]).reshape(-1, 1), np.array([])
return np.array(X).reshape(-1, 1), np.array(lengths)
def initialize_from_ars(self, chains):
"""Initialisiert HMM-Parameter aus ARS-Daten"""
print("\n=== Initialisiere HMM aus ARS-3.0-Daten ===")
# Zuerst prepare_data aufrufen, um Mapping zu erstellen
X, lengths = self.prepare_data(chains)
if len(X) == 0:
print("Warnung: Keine Daten für HMM-Initialisierung")
return None
# 1. Startwahrscheinlichkeiten
startprob = np.zeros(self.n_states)
startprob[0] = 0.7 # Greeting
startprob[1] = 0.2 # Need Determination
startprob[4] = 0.1 # Farewell
# 2. Übergangsmatrix
transmat = np.zeros((self.n_states, self.n_states))
transmat[0, 1] = 0.8
transmat[0, 0] = 0.2
transmat[1, 2] = 0.6
transmat[1, 3] = 0.3
transmat[1, 1] = 0.1
transmat[2, 3] = 0.5
transmat[2, 2] = 0.4
transmat[2, 1] = 0.1
transmat[3, 4] = 0.9
transmat[3, 3] = 0.1
transmat[4, 4] = 1.0
# 3. Emissionswahrscheinlichkeiten (gleichverteilt initial)
emissionprob = np.ones((self.n_states, self.n_features)) / self.n_features
# HMM erstellen
self.model = hmm.MultinomialHMM(
n_components=self.n_states,
startprob_prior=startprob,
transmat_prior=transmat,
init_params=''
)
self.model.startprob_ = startprob
self.model.transmat_ = transmat
self.model.emissionprob_ = emissionprob
print(f"HMM initialisiert: {self.n_states} Zustände, {self.n_features} Symbole")
self.print_parameters()
return self.model
def fit(self, chains, n_iter=100):
"""Trainiert das HMM mit Baum-Welch"""
X, lengths = self.prepare_data(chains)
if len(X) == 0:
raise ValueError("Keine Daten zum Trainieren vorhanden")
print(f"\n=== Trainiere HMM mit {len(chains)} Sequenzen ===")
print(f"Gesamtlänge: {len(X)} Beobachtungen")
if self.model is None:
self.model = hmm.MultinomialHMM(
n_components=self.n_states,
n_iter=n_iter,
random_state=42
)
self.model.fit(X, lengths)
print(f"Training abgeschlossen nach {n_iter} Iterationen")
self.print_parameters()
return self.model
def print_parameters(self):
"""Gibt die Modellparameter aus"""
if self.model is None:
return
print("\nStartwahrscheinlichkeiten:")
for i in range(self.n_states):
print(f" {self.state_names[i]}: {self.model.startprob_[i]:.3f}")
print("\nÜbergangsmatrix:")
for i in range(self.n_states):
row = " " + " ".join([f"{self.model.transmat_[i,j]:.3f}"
for j in range(self.n_states)])
print(f"{self.state_names[i]}: {row}")
def decode(self, chain):
"""Viterbi-Dekodierung einer Kette"""
if self.model is None:
return None, None
# Konvertiere chain in Indizes
X_list = []
for sym in chain:
if sym in self.symbol_to_idx:
X_list.append(self.symbol_to_idx[sym])
else:
# Fallback: überspringe unbekannte Symbole
continue
if not X_list:
return None, None
X = np.array(X_list).reshape(-1, 1)
try:
logprob, states = self.model.decode(X, algorithm="viterbi")
return states, np.exp(logprob)
except:
return None, None
def get_parameters_string(self):
"""Gibt die HMM-Parameter als String zurück"""
if self.model is None:
return "Kein HMM trainiert"
lines = []
lines.append("Startwahrscheinlichkeiten:")
for i in range(self.n_states):
lines.append(f" {self.state_names[i]}: {self.model.startprob_[i]:.3f}")
lines.append("\nÜbergangsmatrix:")
for i in range(self.n_states):
row = " " + " ".join([f"{self.model.transmat_[i,j]:.3f}"
for j in range(self.n_states)])
lines.append(f"{self.state_names[i]}: {row}")
return '\n'.join(lines)
else:
class ARSHiddenMarkovModel:
def __init__(self, *args, **kwargs):
raise ImportError("hmmlearn nicht installiert")
# ============================================================================
# HYBRIDE INTEGRATION (ARS 4.0 - SZENARIO D2)
# ============================================================================
if CRF_AVAILABLE:
class ARSCRFModel:
"""
CRF-Modell für sequenzielle Abhängigkeiten
"""
def __init__(self):
self.crf = CRF(
algorithm='lbfgs',
c1=0.1,
c2=0.1,
max_iterations=100,
all_possible_transitions=True
)
def extract_features(self, sequence, i):
"""Extrahiert Features für Position i"""
features = {
'bias': 1.0,
'symbol': sequence[i],
'prefix_K': sequence[i].startswith('K'),
'prefix_V': sequence[i].startswith('V'),
'suffix_A': sequence[i].endswith('A'),
'suffix_B': sequence[i].endswith('B'),
'suffix_E': sequence[i].endswith('E'),
'suffix_G': sequence[i].endswith('G'),
'suffix_V': sequence[i].endswith('V'),
'position': i,
'is_first': i == 0,
'is_last': i == len(sequence) - 1,
}
# Kontext-Features
for offset in [-2, -1, 1, 2]:
if 0 <= i + offset < len(sequence):
sym = sequence[i + offset]
features[f'context_{offset:+d}'] = sym
if i > 0:
features['bigram'] = f"{sequence[i-1]}_{sequence[i]}"
return features
def prepare_data(self, sequences):
"""Bereitet Daten für CRF-Training vor"""
X = []
y = []
for seq in sequences:
X_seq = [self.extract_features(seq, i) for i in range(len(seq))]
y_seq = [sym for sym in seq]
X.append(X_seq)
y.append(y_seq)
return X, y
def fit(self, sequences):
"""Trainiert das CRF-Modell"""
X, y = self.prepare_data(sequences)
self.crf.fit(X, y)
return self
def predict(self, sequence):
"""Sagt Labels für eine Sequenz vorher"""
X = [self.extract_features(sequence, i) for i in range(len(sequence))]
return self.crf.predict([X])[0]
def get_top_features(self, n=20):
"""Gibt die wichtigsten Features zurück"""
if not hasattr(self.crf, 'state_features_'):
return []
top = sorted(
self.crf.state_features_.items(),
key=lambda x: abs(x[1]),
reverse=True
)[:n]
return [(attr, label, weight) for (attr, label), weight in top]
else:
class ARSCRFModel:
def __init__(self, *args, **kwargs):
raise ImportError("sklearn-crfsuite nicht installiert")
if TRANSFORMER_AVAILABLE:
class SemanticValidator:
"""
Validiert Kategorien mit Transformer-Embeddings
"""
def __init__(self):
self.model = None
self.embeddings = {}
self.symbol_to_texts = self._create_text_mapping()
def _create_text_mapping(self):
"""Erstellt Beispieltexte für Symbole"""
return {
'KBG': ['Good day', 'Good morning', 'Hello'],
'VBG': ['Good day', 'Good morning', 'Hello back'],
'KBBd': ['One sausage', 'I would like cheese', 'One kilo apples'],
'VBBd': ['How much?', 'Which kind?', 'Anything else?'],
'KBA': ['Two hundred grams', 'The white ones', 'Yes please'],
'VBA': ['All right', 'Coming up', 'Okay'],
'KAE': ['Can I put in salad?', 'Where from?', 'Is it fresh?'],
'VAE': ['Better to saute', 'From region', 'Very fresh'],
'KAA': ['Here you go', 'Thanks', 'Yes thanks'],
'VAA': ['That will be 8 marks', '3 marks', '14 marks'],
'KAV': ['Goodbye', 'Bye', 'Have a nice day'],
'VAV': ['Thank you', 'Have a nice day', 'Goodbye']
}
def load_model(self):
"""Lädt das Sentence-Transformer-Modell"""
try:
self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
return True
except:
return False
def compute_embeddings(self):
"""Berechnet Embeddings für alle Symbole"""
if self.model is None:
if not self.load_model():
return False
for symbol, texts in self.symbol_to_texts.items():
embeddings = self.model.encode(texts)
self.embeddings[symbol] = np.mean(embeddings, axis=0)
return True
def similarity_matrix(self):
"""Berechnet Ähnlichkeitsmatrix zwischen Symbolen"""
if not self.embeddings:
if not self.compute_embeddings():
return None, None
symbols = sorted(self.embeddings.keys())
n = len(symbols)
matrix = np.zeros((n, n))
for i, sym1 in enumerate(symbols):
for j, sym2 in enumerate(symbols):
emb1 = self.embeddings[sym1]
emb2 = self.embeddings[sym2]
sim = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
matrix[i, j] = sim
return matrix, symbols
def get_intra_similarities(self):
"""Gibt Intra-Kategorie-Ähnlichkeiten zurück"""
matrix, symbols = self.similarity_matrix()
if matrix is None:
return {}
return {sym: matrix[i, i] for i, sym in enumerate(symbols)}
else:
class SemanticValidator:
def __init__(self, *args, **kwargs):
raise ImportError("sentence-transformers nicht installiert")
if NETWORKX_AVAILABLE:
class GrammarGraph:
"""
Repräsentiert Grammatik als Graph
"""
def __init__(self, grammar_rules):
self.grammar = grammar_rules
self.graph = nx.DiGraph()
self.build_graph()
def build_graph(self):
"""Baut Graphen aus Grammatik"""
for nt, productions in self.grammar.items():
for prod, prob in productions:
for sym in prod:
self.graph.add_edge(nt, sym, weight=prob)
def centrality(self):
"""Berechnet Zentralität der Knoten"""
return nx.degree_centrality(self.graph)
else:
class GrammarGraph:
def __init__(self, *args, **kwargs):
raise ImportError("networkx nicht installiert")
class AttentionVisualizer:
"""
Visualisiert Attention auf Sequenzen
"""
def __init__(self, chains):
self.chains = chains
self.bigram_probs = self._compute_bigram_probs()
def _compute_bigram_probs(self):
"""Berechnet Bigram-Wahrscheinlichkeiten"""
bigram_counts = defaultdict(int)
unigram_counts = defaultdict(int)
for chain in self.chains:
for i in range(len(chain)-1):
bigram_counts[(chain[i], chain[i+1])] += 1
unigram_counts[chain[i]] += 1
if chain:
unigram_counts[chain[-1]] += 1
probs = {}
for (prev, next_), count in bigram_counts.items():
if unigram_counts[prev] > 0:
probs[(prev, next_)] = count / unigram_counts[prev]
return probs
def attention_weights(self, sequence):
"""Berechnet vereinfachte Attention-Gewichte"""
n = len(sequence)
attention = np.zeros((n, n))
for i in range(1, n):
prev = sequence[i-1]
current = sequence[i]
if (prev, current) in self.bigram_probs:
attention[i, i-1] = self.bigram_probs[(prev, current)]
for j in range(i-2, -1, -1):
attention[i, j] = attention[i, j+1] * 0.5
for i in range(n):
row_sum = attention[i].sum()
if row_sum > 0:
attention[i] /= row_sum
return attention
# ============================================================================
# PLOT-FUNKTIONEN (für Thread-sichere Ausführung)
# ============================================================================
def plot_petri_net(petri_net, filename="petri_net.png"):
"""Plottet ein Petri-Netz"""
if not NETWORKX_AVAILABLE:
print("networkx nicht verfügbar")
return
G = nx.DiGraph()
# Füge Stellen hinzu (Kreise)
for place in petri_net.places:
G.add_node(place, type='place', shape='circle')
# Füge Transitionen hinzu (Rechtecke)
for trans in petri_net.transitions:
G.add_node(trans, type='transition', shape='box')
# Füge Kanten hinzu
for arc in petri_net.arcs:
G.add_edge(arc['source'], arc['target'], weight=arc['weight'])
# Layout
pos = nx.spring_layout(G)
plt.figure(figsize=(15, 10))
# Zeichne Stellen
place_nodes = [n for n in G.nodes() if G.nodes[n].get('type') == 'place']
nx.draw_networkx_nodes(G, pos, nodelist=place_nodes,
node_color='lightblue', node_shape='o',
node_size=1000)
# Zeichne Transitionen
trans_nodes = [n for n in G.nodes() if G.nodes[n].get('type') == 'transition']
nx.draw_networkx_nodes(G, pos, nodelist=trans_nodes,
node_color='lightgreen', node_shape='s',
node_size=800)
# Zeichne Kanten
nx.draw_networkx_edges(G, pos, arrows=True, arrowsize=20)
# Zeichne Labels
labels = {}
for node in G.nodes():
if node in petri_net.places:
labels[node] = f"{node}\n[{petri_net.tokens.get(node, 0)}]"
else:
labels[node] = node
nx.draw_networkx_labels(G, pos, labels, font_size=8)
plt.title(f"Petri-Netz: {petri_net.name}")
plt.axis('off')
plt.tight_layout()
plt.savefig(filename, dpi=150)
plt.show()
def plot_similarity_matrix(matrix, symbols, filename="category_similarity.png"):
"""Plottet Ähnlichkeitsmatrix"""
plt.figure(figsize=(12, 10))
if SEABORN_AVAILABLE:
sns.heatmap(matrix, xticklabels=symbols, yticklabels=symbols,
cmap='viridis', annot=True, fmt='.2f')
else:
plt.imshow(matrix, cmap='viridis', interpolation='nearest')
plt.colorbar()
plt.xticks(range(len(symbols)), symbols, rotation=90)
plt.yticks(range(len(symbols)), symbols)
plt.title('Semantic Similarity Between Categories')
plt.tight_layout()
plt.savefig(filename, dpi=150)
plt.show()
def plot_grammar_graph(graph, filename="grammar_graph.png"):
"""Plottet Grammatik-Graphen"""
if not NETWORKX_AVAILABLE:
print("networkx nicht verfügbar")
return
plt.figure(figsize=(15, 10))
pos = nx.spring_layout(graph, k=2, iterations=50)
node_colors = []
for node in graph.nodes():
if node.startswith('NT_'):
node_colors.append('lightgreen')
else:
node_colors.append('lightblue')
nx.draw(graph, pos, node_color=node_colors, with_labels=True,
node_size=1000, font_size=8, arrows=True, arrowsize=20)
plt.title('Grammar Graph')
plt.tight_layout()
plt.savefig(filename, dpi=150)
plt.show()
def plot_attention(attention, sequence, title="Attention Weights", filename="attention_weights.png"):
"""Plottet Attention-Matrix"""
plt.figure(figsize=(10, 8))
if SEABORN_AVAILABLE:
sns.heatmap(attention, xticklabels=sequence, yticklabels=sequence,
cmap='viridis', annot=True, fmt='.2f')
else:
plt.imshow(attention, cmap='viridis', interpolation='nearest')
plt.colorbar()
plt.xticks(range(len(sequence)), sequence)
plt.yticks(range(len(sequence)), sequence)
plt.title(title)
plt.tight_layout()
plt.savefig(filename, dpi=150)
plt.show()
# ============================================================================
# GUI - HAUPTFENSTER
# ============================================================================
class ARSGUI:
def __init__(self, root):
self.root = root
self.root.title("ARS - Algorithmic Recursive Sequence Analysis 4.0")
self.root.geometry("1400x900")
# Plot-Thread für sichere Visualisierung
self.plot_thread = PlotThread(root)
# Queue für GUI-Updates aus Threads
self.update_queue = queue.Queue()
self.process_updates()
# Daten
self.chains = []
self.terminals = []
self.delimiter = tk.StringVar(value=",")
self.start_symbol = tk.StringVar(value="")
# ARS-Objekte
self.ars20 = ARS20()
self.ars30 = GrammarInducer() # ARS 3.0
self.petri_builder = None
self.hmm_model = None
self.crf_model = None
self.semantic_validator = None
self.grammar_graph = None
self.attention_viz = None
# Optimierung
self.optimization_running = False
self.opt_progress_var = tk.DoubleVar()
# Verfügbarkeit der optionalen Module
self.module_status = {
'networkx': NETWORKX_AVAILABLE,
'hmmlearn': HMM_AVAILABLE,
'crf': CRF_AVAILABLE,
'transformer': TRANSFORMER_AVAILABLE,
'torch': TORCH_AVAILABLE,
'seaborn': SEABORN_AVAILABLE
}
# GUI aufbauen
self.create_menu()
self.create_main_panels()
# Status
self.status_var = tk.StringVar(value="Bereit")
self.create_statusbar()
# Modulstatus anzeigen
self.show_module_status()
def process_updates(self):
"""Verarbeitet Updates aus Threads im Hauptthread"""
try:
while True:
update_func = self.update_queue.get_nowait()
update_func()
except queue.Empty:
pass
finally:
self.root.after(100, self.process_updates)
def safe_gui_update(self, func):
"""Führt eine GUI-Update-Funktion thread-sicher aus"""
self.update_queue.put(func)
def create_menu(self):
"""Erstellt die Menüleiste"""
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# Datei-Menü
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Datei", menu=file_menu)
file_menu.add_command(label="Öffnen", command=self.load_file)
file_menu.add_command(label="Beispiel laden", command=self.load_example)
file_menu.add_separator()
file_menu.add_command(label="Beenden", command=self.root.quit)
# Hilfe-Menü
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Hilfe", menu=help_menu)
help_menu.add_command(label="Modulstatus", command=self.show_module_status)
help_menu.add_command(label="Über", command=self.show_about)
def create_main_panels(self):
"""Erstellt die Hauptbereiche"""
# Hauptframe mit PanedWindow
main_paned = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL)
main_paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Linkes Panel - Eingabe
left_frame = ttk.Frame(main_paned)
main_paned.add(left_frame, weight=1)
self.create_input_panel(left_frame)
# Rechtes Panel - Notebook mit Tabs
right_frame = ttk.Frame(main_paned)
main_paned.add(right_frame, weight=3)
self.create_output_panel(right_frame)
def create_input_panel(self, parent):
"""Erstellt das Eingabe-Panel"""
# Titel
ttk.Label(parent, text="Eingabe", font=('Arial', 12, 'bold')).pack(anchor=tk.W, pady=5)
# Delimiter-Auswahl
delim_frame = ttk.Frame(parent)
delim_frame.pack(fill=tk.X, pady=5)
ttk.Label(delim_frame, text="Trennzeichen:").pack(side=tk.LEFT)
ttk.Radiobutton(delim_frame, text="Komma (,)", variable=self.delimiter,
value=",").pack(side=tk.LEFT, padx=2)
ttk.Radiobutton(delim_frame, text="Semikolon (;)", variable=self.delimiter,
value=";").pack(side=tk.LEFT, padx=2)
ttk.Radiobutton(delim_frame, text="Leerzeichen", variable=self.delimiter,
value=" ").pack(side=tk.LEFT, padx=2)
ttk.Radiobutton(delim_frame, text="Benutzer", variable=self.delimiter,
value="custom").pack(side=tk.LEFT, padx=2)
self.custom_delimiter = ttk.Entry(delim_frame, width=5)
self.custom_delimiter.pack(side=tk.LEFT, padx=2)
self.custom_delimiter.insert(0, "|")
# Text-Eingabe
ttk.Label(parent, text="Terminalzeichenketten (eine pro Zeile):").pack(anchor=tk.W, pady=5)
self.text_input = scrolledtext.ScrolledText(parent, height=12, font=('Courier', 10))
self.text_input.pack(fill=tk.BOTH, expand=True, pady=5)
# Buttons
btn_frame = ttk.Frame(parent)
btn_frame.pack(fill=tk.X, pady=5)
ttk.Button(btn_frame, text="Datei laden", command=self.load_file).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="Parsen", command=self.parse_input).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="Beispiel", command=self.load_example).pack(side=tk.LEFT, padx=2)
# Startzeichen
start_frame = ttk.Frame(parent)
start_frame.pack(fill=tk.X, pady=5)
ttk.Label(start_frame, text="Startzeichen:").pack(side=tk.LEFT)
self.start_entry = ttk.Entry(start_frame, textvariable=self.start_symbol, width=10)
self.start_entry.pack(side=tk.LEFT, padx=5)
# Info
self.info_var = tk.StringVar(value="Keine Daten geladen")
ttk.Label(parent, textvariable=self.info_var, foreground="blue").pack(anchor=tk.W, pady=5)
def create_output_panel(self, parent):
"""Erstellt das Output-Notebook mit Tabs"""
self.notebook = ttk.Notebook(parent)
self.notebook.pack(fill=tk.BOTH, expand=True)
# Tab 1: ARS 2.0
self.tab20 = ttk.Frame(self.notebook)
self.notebook.add(self.tab20, text="ARS 2.0 (Basis)")
self.create_ars20_tab()
# Tab 2: ARS 3.0
self.tab30 = ttk.Frame(self.notebook)
self.notebook.add(self.tab30, text="ARS 3.0 (Nonterminale)")
self.create_ars30_tab()
# Tab 3: Petri-Netze
self.tab_petri = ttk.Frame(self.notebook)
self.notebook.add(self.tab_petri, text="Petri-Netze")
self.create_petri_tab()
# Tab 4: Bayessche Netze
self.tab_bayes = ttk.Frame(self.notebook)
self.notebook.add(self.tab_bayes, text="Bayessche Netze")
self.create_bayes_tab()
# Tab 5: Hybride Integration
self.tab_hybrid = ttk.Frame(self.notebook)
self.notebook.add(self.tab_hybrid, text="Hybrid")
self.create_hybrid_tab()
# Tab 6: Generierung
self.tab_gen = ttk.Frame(self.notebook)
self.notebook.add(self.tab_gen, text="Generierung")
self.create_generation_tab()
def create_ars20_tab(self):
"""Erstellt ARS 2.0 Tab"""
# Steuerung
control = ttk.Frame(self.tab20)
control.pack(fill=tk.X, pady=5)
ttk.Button(control, text="ARS 2.0 berechnen",
command=self.run_ars20).pack(side=tk.LEFT, padx=5)
ttk.Button(control, text="Optimierung starten",
command=self.run_optimization).pack(side=tk.LEFT, padx=5)
self.opt_progress = ttk.Progressbar(control, length=200, mode='determinate')
self.opt_progress.pack(side=tk.LEFT, padx=10)
# Textausgabe
self.text20 = scrolledtext.ScrolledText(self.tab20, font=('Courier', 10))
self.text20.pack(fill=tk.BOTH, expand=True, pady=5)
def create_ars30_tab(self):
"""Erstellt ARS 3.0 Tab"""
control = ttk.Frame(self.tab30)
control.pack(fill=tk.X, pady=5)
ttk.Button(control, text="Grammatik induzieren",
command=self.run_ars30).pack(side=tk.LEFT, padx=5)
self.ars30_progress = ttk.Progressbar(control, length=200, mode='indeterminate')
self.ars30_progress.pack(side=tk.LEFT, padx=10)
self.text30 = scrolledtext.ScrolledText(self.tab30, font=('Courier', 10))
self.text30.pack(fill=tk.BOTH, expand=True, pady=5)
def create_petri_tab(self):
"""Erstellt Petri-Netz Tab"""
control = ttk.Frame(self.tab_petri)
control.pack(fill=tk.X, pady=5)
if self.module_status['networkx']:
ttk.Button(control, text="Einfaches Netz",
command=self.build_basic_petri).pack(side=tk.LEFT, padx=5)
ttk.Button(control, text="Netz mit Ressourcen",
command=self.build_resource_petri).pack(side=tk.LEFT, padx=5)
ttk.Button(control, text="Simuliere Transkript 1",
command=self.simulate_petri).pack(side=tk.LEFT, padx=5)
else:
ttk.Label(control, text="networkx nicht verfügbar",
foreground="red").pack(side=tk.LEFT, padx=5)
self.text_petri = scrolledtext.ScrolledText(self.tab_petri, font=('Courier', 10))
self.text_petri.pack(fill=tk.BOTH, expand=True, pady=5)
def create_bayes_tab(self):
"""Erstellt Bayessche Netze Tab"""
control = ttk.Frame(self.tab_bayes)
control.pack(fill=tk.X, pady=5)
if self.module_status['hmmlearn']:
ttk.Button(control, text="HMM initialisieren",
command=self.init_hmm).pack(side=tk.LEFT, padx=5)
ttk.Button(control, text="HMM trainieren",
command=self.train_hmm).pack(side=tk.LEFT, padx=5)
ttk.Button(control, text="Dekodiere Transkript 1",
command=self.decode_hmm).pack(side=tk.LEFT, padx=5)
else:
ttk.Label(control, text="hmmlearn nicht verfügbar",
foreground="red").pack(side=tk.LEFT, padx=5)
self.text_bayes = scrolledtext.ScrolledText(self.tab_bayes, font=('Courier', 10))
self.text_bayes.pack(fill=tk.BOTH, expand=True, pady=5)
def create_hybrid_tab(self):
"""Erstellt Hybrid-Tab"""
control = ttk.Frame(self.tab_hybrid)
control.pack(fill=tk.X, pady=5)
if self.module_status['crf']:
ttk.Button(control, text="CRF trainieren",
command=self.train_crf).pack(side=tk.LEFT, padx=5)
else:
ttk.Label(control, text="sklearn-crfsuite nicht verfügbar",
foreground="red").pack(side=tk.LEFT, padx=5)
if self.module_status['transformer']:
ttk.Button(control, text="Semantische Validierung",
command=self.run_semantic).pack(side=tk.LEFT, padx=5)
else:
ttk.Label(control, text="sentence-transformers nicht verfügbar",
foreground="red").pack(side=tk.LEFT, padx=5)
if self.module_status['networkx']:
ttk.Button(control, text="Grammatik-Graph",
command=self.build_grammar_graph).pack(side=tk.LEFT, padx=5)
else:
ttk.Label(control, text="networkx nicht verfügbar",
foreground="red").pack(side=tk.LEFT, padx=5)
ttk.Button(control, text="Attention visualisieren",
command=self.visualize_attention).pack(side=tk.LEFT, padx=5)
self.text_hybrid = scrolledtext.ScrolledText(self.tab_hybrid, font=('Courier', 10))
self.text_hybrid.pack(fill=tk.BOTH, expand=True, pady=5)
def create_generation_tab(self):
"""Erstellt Generierungs-Tab"""
control = ttk.Frame(self.tab_gen)
control.pack(fill=tk.X, pady=5)
ttk.Label(control, text="Grammatik:").pack(side=tk.LEFT)
self.gen_source = tk.StringVar(value="ars20")
ttk.Radiobutton(control, text="ARS 2.0", variable=self.gen_source,
value="ars20").pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(control, text="ARS 3.0", variable=self.gen_source,
value="ars30").pack(side=tk.LEFT, padx=5)
ttk.Label(control, text="Anzahl:").pack(side=tk.LEFT, padx=(20,5))
self.gen_count = ttk.Spinbox(control, from_=1, to=50, width=5)
self.gen_count.set(5)
self.gen_count.pack(side=tk.LEFT)
ttk.Button(control, text="Generieren",
command=self.generate_chains).pack(side=tk.LEFT, padx=20)
self.text_gen = scrolledtext.ScrolledText(self.tab_gen, font=('Courier', 10))
self.text_gen.pack(fill=tk.BOTH, expand=True, pady=5)
def create_statusbar(self):
"""Erstellt Statusleiste"""
status = ttk.Frame(self.root)
status.pack(side=tk.BOTTOM, fill=tk.X)
ttk.Label(status, textvariable=self.status_var).pack(side=tk.LEFT, padx=5)
self.progress_bar = ttk.Progressbar(status, length=100, mode='indeterminate')
self.progress_bar.pack(side=tk.RIGHT, padx=5)
def show_module_status(self):
"""Zeigt Status der optionalen Module"""
status_text = "Modulstatus:\n"
status_text += f"✓ networkx: {'verfügbar' if self.module_status['networkx'] else 'nicht verfügbar'}\n"
status_text += f"✓ hmmlearn: {'verfügbar' if self.module_status['hmmlearn'] else 'nicht verfügbar'}\n"
status_text += f"✓ sklearn-crfsuite: {'verfügbar' if self.module_status['crf'] else 'nicht verfügbar'}\n"
status_text += f"✓ sentence-transformers: {'verfügbar' if self.module_status['transformer'] else 'nicht verfügbar'}\n"
status_text += f"✓ torch: {'verfügbar' if self.module_status['torch'] else 'nicht verfügbar'}\n"
status_text += f"✓ seaborn: {'verfügbar' if self.module_status['seaborn'] else 'nicht verfügbar'}"
messagebox.showinfo("Modulstatus", status_text)
# ========================================================================
# FUNKTIONEN
# ========================================================================
def get_actual_delimiter(self):
"""Gibt aktuelles Trennzeichen zurück"""
delim = self.delimiter.get()
if delim == "custom":
return self.custom_delimiter.get()
return delim
def parse_line(self, line):
"""Parst eine Zeile"""
line = line.strip()
if not line:
return []
delim = self.get_actual_delimiter()
if delim == " ":
parts = re.split(r'\s+', line)
else:
parts = line.split(delim)
return [p.strip() for p in parts if p.strip()]
def parse_input(self):
"""Parst die Eingabe"""
self.text_input.update()
text = self.text_input.get("1.0", tk.END)
lines = text.strip().split('\n')
self.chains = []
for line in lines:
chain = self.parse_line(line)
if chain:
self.chains.append(chain)
if self.chains:
# Alle Terminale aus allen Ketten sammeln
all_symbols = set()
for chain in self.chains:
for symbol in chain:
all_symbols.add(symbol)
self.terminals = sorted(all_symbols)
self.info_var.set(f"{len(self.chains)} Ketten, {len(self.terminals)} Terminale")
self.status_var.set(f"{len(self.chains)} Ketten geladen")
# In ARS-Objekte laden
self.ars20.load_chains(self.chains, self.start_symbol.get() or None)
self.ars30.load_chains(self.chains, self.start_symbol.get() or None)
# Petri-Builder initialisieren (falls verfügbar)
if self.module_status['networkx']:
self.petri_builder = PetriNetBuilder(self.chains, self.ars30.rules)
# Vorschau anzeigen
self.show_ars20_preview()
else:
messagebox.showwarning("Warnung", "Keine gültigen Ketten gefunden!")
def show_ars20_preview(self):
"""Zeigt ARS 2.0 Vorschau"""
self.text20.delete("1.0", tk.END)
self.text20.insert(tk.END, self.ars20.print_grammar())
def run_ars20(self):
"""Führt ARS 2.0 aus"""
if not self.chains:
messagebox.showerror("Fehler", "Keine Daten geladen!")
return
self.show_ars20_preview()
self.status_var.set("ARS 2.0 abgeschlossen")
def run_optimization(self):
"""Startet Optimierung"""
if not self.chains:
messagebox.showerror("Fehler", "Keine Daten geladen!")
return
if self.optimization_running:
messagebox.showinfo("Info", "Optimierung läuft bereits")
return
self.optimization_running = True
self.opt_progress['value'] = 0
def update_progress(iter_num, max_iter, corr, p_val):
def update():
self.opt_progress['value'] = iter_num
self.status_var.set(f"Optimierung: Iteration {iter_num}, r={corr:.4f}")
self.safe_gui_update(update)
def run():
try:
probs, best_corr, history = self.ars20.optimize(progress_callback=update_progress)
def update_display():
self.text20.insert(tk.END, "\n" + "="*70 + "\n")
self.text20.insert(tk.END, "OPTIMIERTE GRAMMATIK\n")
self.text20.insert(tk.END, "="*70 + "\n\n")
if probs:
for start in sorted(probs.keys()):
trans = probs[start]
trans_str = ", ".join([f"'{end}': {prob:.3f}" for end, prob in sorted(trans.items())])
self.text20.insert(tk.END, f"{start} -> {trans_str}\n")
else:
self.text20.insert(tk.END, "Keine optimierte Grammatik erhalten.\n")
self.text20.insert(tk.END, f"\nBeste Korrelation: {best_corr:.4f}\n")
self.status_var.set(f"Optimierung abgeschlossen, r={best_corr:.4f}")
self.opt_progress['value'] = 0
self.optimization_running = False
self.safe_gui_update(update_display)
except Exception as e:
def error_display():
messagebox.showerror("Fehler", f"Optimierung fehlgeschlagen:\n{str(e)}")
self.optimization_running = False
self.safe_gui_update(error_display)
thread = threading.Thread(target=run)
thread.daemon = True
thread.start()
def run_ars30(self):
"""Führt ARS 3.0 aus"""
if not self.chains:
messagebox.showerror("Fehler", "Keine Daten geladen!")
return
self.ars30_progress.start()
self.status_var.set("Induziere Grammatik...")
def update_progress(iter_num, max_iter, sequence, new_nt):
def update():
self.status_var.set(f"Induktion: {new_nt} gefunden")
self.safe_gui_update(update)
def run():
try:
self.ars30.induce_grammar(progress_callback=update_progress)
def update_display():
self.text30.delete("1.0", tk.END)
self.text30.insert(tk.END, self.ars30.print_grammar())
self.ars30_progress.stop()
self.status_var.set("Grammatikinduktion abgeschlossen")
self.safe_gui_update(update_display)
except Exception as e:
def error_display():
messagebox.showerror("Fehler", f"Grammatikinduktion fehlgeschlagen:\n{str(e)}")
self.ars30_progress.stop()
self.safe_gui_update(error_display)
thread = threading.Thread(target=run)
thread.daemon = True
thread.start()
def build_basic_petri(self):
"""Erstellt einfaches Petri-Netz"""
if not self.module_status['networkx']:
messagebox.showerror("Fehler", "networkx nicht installiert!")
return
if not self.petri_builder:
messagebox.showerror("Fehler", "Keine Daten geladen!")
return
try:
self.petri_builder.build_basic_net()
self.text_petri.delete("1.0", tk.END)
self.text_petri.insert(tk.END, "Einfaches Petri-Netz erstellt:\n")
self.text_petri.insert(tk.END, f"Stellen: {len(self.petri_builder.petri_net.places)}\n")
self.text_petri.insert(tk.END, f"Transitionen: {len(self.petri_builder.petri_net.transitions)}\n")
self.text_petri.insert(tk.END, f"Kanten: {len(self.petri_builder.petri_net.arcs)}\n")
self.status_var.set("Petri-Netz erstellt")
except Exception as e:
messagebox.showerror("Fehler", f"Fehler beim Erstellen des Petri-Netzes:\n{str(e)}")
def build_resource_petri(self):
"""Erstellt Petri-Netz mit Ressourcen"""
if not self.module_status['networkx']:
messagebox.showerror("Fehler", "networkx nicht installiert!")
return
if not self.petri_builder:
messagebox.showerror("Fehler", "Keine Daten geladen!")
return
try:
self.petri_builder.build_resource_net()
self.text_petri.delete("1.0", tk.END)
self.text_petri.insert(tk.END, "Petri-Netz mit Ressourcen erstellt:\n")
self.text_petri.insert(tk.END, f"Stellen: {len(self.petri_builder.petri_net.places)}\n")
self.text_petri.insert(tk.END, f"Transitionen: {len(self.petri_builder.petri_net.transitions)}\n")
self.text_petri.insert(tk.END, f"Kanten: {len(self.petri_builder.petri_net.arcs)}\n")
self.text_petri.insert(tk.END, "\nRessourcen-Stellen:\n")
for p, data in self.petri_builder.petri_net.places.items():
if data['type'] == 'resource':
self.text_petri.insert(tk.END, f" {p}: {data['initial_tokens']} Token\n")
self.status_var.set("Petri-Netz mit Ressourcen erstellt")
except Exception as e:
messagebox.showerror("Fehler", f"Fehler beim Erstellen des Petri-Netzes:\n{str(e)}")
def simulate_petri(self):
"""Simuliert Transkript 1 im Petri-Netz"""
if not self.module_status['networkx']:
messagebox.showerror("Fehler", "networkx nicht installiert!")
return
if not self.petri_builder or not self.petri_builder.petri_net:
messagebox.showerror("Fehler", "Kein Petri-Netz vorhanden!")
return
if not self.chains:
return
try:
results, marking = self.petri_builder.simulate_chain(self.chains[0])
self.text_petri.insert(tk.END, "\n" + "="*50 + "\n")
self.text_petri.insert(tk.END, "Simulation Transkript 1:\n")
self.text_petri.insert(tk.END, "="*50 + "\n")
for sym, success, reason in results:
status = "✓" if success else "✗"
self.text_petri.insert(tk.END, f"{status} {sym}: {reason}\n")
self.text_petri.insert(tk.END, f"\nFinale Markierung:\n")
for p, tokens in marking.items():
if tokens > 0:
self.text_petri.insert(tk.END, f" {p}: {tokens}\n")
# Visualisierung
self.plot_thread.plot(plot_petri_net, self.petri_builder.petri_net, "petri_net.png")
except Exception as e:
messagebox.showerror("Fehler", f"Fehler bei der Simulation:\n{str(e)}")
def init_hmm(self):
"""Initialisiert HMM"""
if not self.module_status['hmmlearn']:
messagebox.showerror("Fehler", "hmmlearn nicht installiert!")
return
if not self.chains:
messagebox.showerror("Fehler", "Keine Daten geladen!")
return
try:
self.hmm_model = ARSHiddenMarkovModel(n_states=5)
result = self.hmm_model.initialize_from_ars(self.chains)
if result is None:
messagebox.showerror("Fehler", "HMM-Initialisierung fehlgeschlagen - keine Daten?")
return
self.text_bayes.delete("1.0", tk.END)
self.text_bayes.insert(tk.END, "HMM initialisiert:\n\n")
self.text_bayes.insert(tk.END, self.hmm_model.get_parameters_string())
self.status_var.set("HMM initialisiert")
except Exception as e:
messagebox.showerror("Fehler", f"Fehler bei HMM-Initialisierung:\n{str(e)}")
def train_hmm(self):
"""Trainiert HMM"""
if not self.module_status['hmmlearn']:
messagebox.showerror("Fehler", "hmmlearn nicht installiert!")
return
if not self.chains:
messagebox.showerror("Fehler", "Keine Daten geladen!")
return
if not self.hmm_model:
self.hmm_model = ARSHiddenMarkovModel(n_states=5)
self.hmm_model.initialize_from_ars(self.chains)
self.status_var.set("Trainiere HMM...")
self.progress_bar.start()
def run():
try:
self.hmm_model.fit(self.chains, n_iter=100)
def update_display():
self.text_bayes.insert(tk.END, "\n" + "="*50 + "\n")
self.text_bayes.insert(tk.END, "Nach Training:\n\n")
self.text_bayes.insert(tk.END, self.hmm_model.get_parameters_string())
self.status_var.set("HMM-Training abgeschlossen")
self.progress_bar.stop()
self.safe_gui_update(update_display)
except Exception as e:
def error_display():
messagebox.showerror("Fehler", f"HMM-Training fehlgeschlagen:\n{str(e)}")
self.progress_bar.stop()
self.safe_gui_update(error_display)
thread = threading.Thread(target=run)
thread.daemon = True
thread.start()
def decode_hmm(self):
"""Dekodiert Transkript 1 mit HMM"""
if not self.module_status['hmmlearn']:
messagebox.showerror("Fehler", "hmmlearn nicht installiert!")
return
if not self.hmm_model or not self.hmm_model.model:
messagebox.showerror("Fehler", "Kein HMM vorhanden!")
return
if not self.chains:
return
try:
states, prob = self.hmm_model.decode(self.chains[0])
if states is None:
messagebox.showerror("Fehler", "Dekodierung fehlgeschlagen")
return
self.text_bayes.insert(tk.END, "\n" + "="*50 + "\n")
self.text_bayes.insert(tk.END, f"Dekodierung Transkript 1 (p={prob:.4f}):\n")
self.text_bayes.insert(tk.END, "="*50 + "\n")
for i, (sym, state) in enumerate(zip(self.chains[0], states)):
state_name = self.hmm_model.state_names.get(state, f"State {state}")
self.text_bayes.insert(tk.END, f"{i+1:2d}: {sym} -> {state_name}\n")
except Exception as e:
messagebox.showerror("Fehler", f"Dekodierung fehlgeschlagen:\n{str(e)}")
def train_crf(self):
"""Trainiert CRF-Modell"""
if not self.module_status['crf']:
messagebox.showerror("Fehler", "sklearn-crfsuite nicht installiert!")
return
if not self.chains:
messagebox.showerror("Fehler", "Keine Daten geladen!")
return
try:
self.crf_model = ARSCRFModel()
self.crf_model.fit(self.chains)
self.text_hybrid.delete("1.0", tk.END)
self.text_hybrid.insert(tk.END, "CRF trainiert.\n\nTop-Features:\n")
for attr, label, weight in self.crf_model.get_top_features(10):
self.text_hybrid.insert(tk.END, f"{attr:30s} -> {label:4s} : {weight:+.4f}\n")
# Beispielvorhersage
if self.chains:
example = self.chains[0][:5]
pred = self.crf_model.predict(example)
self.text_hybrid.insert(tk.END, f"\nBeispiel: {example}\n")
self.text_hybrid.insert(tk.END, f"Vorhersage: {pred}\n")
self.status_var.set("CRF-Training abgeschlossen")
except Exception as e:
messagebox.showerror("Fehler", f"CRF-Training fehlgeschlagen:\n{str(e)}")
def run_semantic(self):
"""Führt semantische Validierung durch"""
if not self.module_status['transformer']:
messagebox.showerror("Fehler", "sentence-transformers nicht installiert!")
return
try:
self.semantic_validator = SemanticValidator()
self.text_hybrid.insert(tk.END, "\n" + "="*50 + "\n")
self.text_hybrid.insert(tk.END, "Semantische Validierung:\n")
self.text_hybrid.insert(tk.END, "="*50 + "\n")
if self.semantic_validator.load_model():
sims = self.semantic_validator.get_intra_similarities()
self.text_hybrid.insert(tk.END, "\nIntra-Kategorie-Ähnlichkeiten:\n")
for sym, sim in sims.items():
self.text_hybrid.insert(tk.END, f" {sym}: {sim:.3f}\n")
# Visualisierung
matrix, symbols = self.semantic_validator.similarity_matrix()
if matrix is not None:
self.plot_thread.plot(plot_similarity_matrix, matrix, symbols)
self.status_var.set("Semantische Validierung abgeschlossen")
else:
self.text_hybrid.insert(tk.END, "Fehler beim Laden des Modells\n")
except Exception as e:
messagebox.showerror("Fehler", f"Semantische Validierung fehlgeschlagen:\n{str(e)}")
def build_grammar_graph(self):
"""Erstellt Grammatik-Graph"""
if not self.module_status['networkx']:
messagebox.showerror("Fehler", "networkx nicht installiert!")
return
if not self.ars30.rules:
messagebox.showerror("Fehler", "Keine ARS-3.0-Grammatik vorhanden!")
return
try:
self.grammar_graph = GrammarGraph(self.ars30.rules)
self.text_hybrid.insert(tk.END, "\n" + "="*50 + "\n")
self.text_hybrid.insert(tk.END, "Grammatik-Graph:\n")
self.text_hybrid.insert(tk.END, "="*50 + "\n")
self.text_hybrid.insert(tk.END, f"Knoten: {self.grammar_graph.graph.number_of_nodes()}\n")
self.text_hybrid.insert(tk.END, f"Kanten: {self.grammar_graph.graph.number_of_edges()}\n")
cent = self.grammar_graph.centrality()
top = sorted(cent.items(), key=lambda x: x[1], reverse=True)[:5]
self.text_hybrid.insert(tk.END, "\nZentralste Knoten:\n")
for node, c in top:
self.text_hybrid.insert(tk.END, f" {node}: {c:.3f}\n")
# Visualisierung
self.plot_thread.plot(plot_grammar_graph, self.grammar_graph.graph)
self.status_var.set("Grammatik-Graph erstellt")
except Exception as e:
messagebox.showerror("Fehler", f"Fehler beim Erstellen des Grammatik-Graphen:\n{str(e)}")
def visualize_attention(self):
"""Visualisiert Attention für Transkript 1"""
if not self.chains:
return
try:
self.attention_viz = AttentionVisualizer(self.chains)
self.text_hybrid.insert(tk.END, "\n" + "="*50 + "\n")
self.text_hybrid.insert(tk.END, "Attention visualisiert (siehe plot)\n")
# Attention berechnen und plotten
attention = self.attention_viz.attention_weights(self.chains[0])
self.plot_thread.plot(plot_attention, attention, self.chains[0])
self.status_var.set("Attention visualisiert")
except Exception as e:
messagebox.showerror("Fehler", f"Fehler bei Attention-Visualisierung:\n{str(e)}")
def generate_chains(self):
"""Generiert neue Ketten"""
source = self.gen_source.get()
count = int(self.gen_count.get())
self.text_gen.delete("1.0", tk.END)
if source == "ars20":
probs = self.ars20.optimized_probabilities or self.ars20.probabilities
if not probs:
self.text_gen.insert(tk.END, "Keine ARS 2.0 Grammatik!\n")
return
self.text_gen.insert(tk.END, f"ARS 2.0 - {count} generierte Ketten:\n\n")
for i in range(count):
chain = self.ars20.generate_chain()
if chain:
self.text_gen.insert(tk.END, f"{i+1}: {' → '.join(chain)}\n")
else: # ars30
if not self.ars30.rules:
self.text_gen.insert(tk.END, "Keine ARS 3.0 Grammatik!\n")
return
self.text_gen.insert(tk.END, f"ARS 3.0 - {count} generierte Ketten:\n\n")
for i in range(count):
chain = self.ars30.generate_chain()
if chain:
self.text_gen.insert(tk.END, f"{i+1}: {' → '.join(chain)}\n")
def load_file(self):
"""Lädt Datei"""
filename = filedialog.askopenfilename(
title="Datei auswählen",
filetypes=[("Textdateien", "*.txt"), ("Alle Dateien", "*.*")]
)
if filename:
try:
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
self.text_input.delete("1.0", tk.END)
self.text_input.insert("1.0", content)
self.status_var.set(f"Geladen: {filename}")
except Exception as e:
messagebox.showerror("Fehler", f"Kann Datei nicht laden:\n{e}")
def load_example(self):
"""Lädt Beispieldaten"""
example = """KBG, VBG, KBBd, VBBd, KBA, VBA, KBBd, VBBd, KBA, VAA, KAA, VAV, KAV
VBG, KBBd, VBBd, VAA, KAA, VBG, KBBd, VAA, KAA
KBBd, VBBd, VAA, KAA
KBBd, VBBd, KBA, VBA, KBBd, VBA, KAE, VAE, KAA, VAV, KAV
KAV, KBBd, VBBd, KBBd, VAA, KAV
KBG, VBG, KBBd, VBBd, KAA
KBBd, VBBd, KBA, VAA, KAA
KBG, VBBd, KBBd, VBA, VAA, KAA, VAV, KAV"""
self.text_input.delete("1.0", tk.END)
self.text_input.insert("1.0", example)
self.parse_input()
def show_about(self):
"""Zeigt Info"""
about = """ARS 4.0 - Algorithmic Recursive Sequence Analysis
Funktionen:
- ARS 2.0: Basis-Grammatik mit Optimierung
- ARS 3.0: Hierarchische Grammatik mit Nonterminalen
- Petri-Netze: Nebenläufigkeit und Ressourcen
- Bayessche Netze: HMM für latente Zustände
- Hybride Integration: CRF, Embeddings, Attention
Das Programm prüft automatisch die Verfügbarkeit aller benötigten
Pakete und installiert fehlende Pakete bei Bedarf nach.
© 2026 Paul Koop"""
messagebox.showinfo("Über ARS", about)
# ============================================================================
# HAUPTFUNKTION
# ============================================================================
def main():
root = tk.Tk()
app = ARSGUI(root)
root.mainloop()
if __name__ == "__main__":
main()