""" ARSXAI10.py - Algorithmic Recursive Sequence Analysis mit Explainable AI ======================================================================== ERWEITERUNG von ARSXAI9.py um: - Depth-Bounded PCFG (Tiefenbeschränkung) - MDL-Optimierung (Kompression als Gütekriterium) - PrefixSpan für große Korpora (optional) - SemInfo-Maximierung für semantische Namen (optional) - Automatische Paketinstallation Version: 10.0 (Evolutionäre Erweiterung mit automatischer PrefixSpan-Installation) """ # ============================================================================ # UMWELTVARIABLEN FÜR WARNUNGEN # ============================================================================ import os os.environ["PYTHONWARNINGS"] = "ignore::UserWarning,ignore::DeprecationWarning,ignore::FutureWarning" # ============================================================================ # STANDARD BIBLIOTHEKEN # ============================================================================ import sys import queue import threading import re import json import subprocess import importlib import warnings import logging import shutil import glob from datetime import datetime from collections import defaultdict, Counter # ============================================================================ # WARNUNGEN VOLLSTÄNDIG UNTERDRÜCKEN # ============================================================================ warnings.filterwarnings("ignore", category=DeprecationWarning) warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=FutureWarning) warnings.filterwarnings("ignore", module="hmmlearn") warnings.filterwarnings("ignore", message="MultinomialHMM has undergone major changes") logging.getLogger('hmmlearn').setLevel(logging.ERROR) # ============================================================================ # PAKETVERWALTUNG (erweitert um prefixspan) # ============================================================================ REQUIRED_PACKAGES = [ 'numpy', 'scipy', 'matplotlib', 'hmmlearn', 'sklearn-crfsuite', 'sentence-transformers', 'networkx', 'torch', 'seaborn', 'tabulate', 'graphviz' ] OPTIONAL_PACKAGES = [ 'prefixspan' # Für große Datenmengen ] def check_and_install_packages(): """Prüft und installiert fehlende Python-Pakete""" print("=" * 70) print("ARSXAI10 - PAKETPRÜFUNG") print("=" * 70) # Pflichtpakete missing_packages = [] for package in REQUIRED_PACKAGES: import_name = package.replace('-', '_') special_imports = { 'sklearn-crfsuite': 'sklearn_crfsuite', 'sentence-transformers': 'sentence_transformers' } import_name = special_imports.get(package, import_name) try: importlib.import_module(import_name) print(f"✓ {package} bereits installiert") except ImportError: print(f"✗ {package} fehlt") missing_packages.append(package) if missing_packages: print("\nInstalliere fehlende Pflichtpakete...") for package in missing_packages: try: subprocess.check_call([sys.executable, "-m", "pip", "install", package]) print(f" ✓ {package} erfolgreich installiert") except subprocess.CalledProcessError as e: print(f" ✗ Fehler bei Installation von {package}: {e}") # Optionale Pakete (nur Hinweis, keine automatische Installation) print("\n" + "-" * 70) print("OPTIONALE PAKETE (für erweiterte Funktionen):") print("-" * 70) for package in OPTIONAL_PACKAGES: try: importlib.import_module(package) print(f"✓ {package} verfügbar") except ImportError: print(f"ℹ️ {package} nicht installiert (optional)") if package == 'prefixspan': print(" Für große Datenmengen (>1000 Ketten) empfohlen.") print(" Installation: pip install prefixspan") print("\n" + "=" * 70 + "\n") # Pakete prüfen check_and_install_packages() # ============================================================================ # GRAPHVIZ KONFIGURATION # ============================================================================ def setup_graphviz(): """Konfiguriert Graphviz und prüft Installation""" GRAPHVIZ_AVAILABLE = False if sys.platform == 'win32': possible_paths = [ r'C:\Program Files\Graphviz\bin', r'C:\Program Files (x86)\Graphviz\bin', r'C:\Graphviz\bin', ] for base_path in [r'C:\Program Files\Graphviz*', r'C:\Program Files (x86)\Graphviz*']: for path in glob.glob(base_path): bin_path = os.path.join(path, 'bin') if os.path.exists(bin_path): possible_paths.append(bin_path) dot_path = shutil.which('dot') if dot_path: print(f"✓ Graphviz (dot) gefunden in: {dot_path}") GRAPHVIZ_AVAILABLE = True else: for path in possible_paths: if os.path.exists(path): dot_exe = os.path.join(path, 'dot.exe') if os.path.exists(dot_exe): os.environ['PATH'] += os.pathsep + path print(f"✓ Graphviz gefunden in: {path}") GRAPHVIZ_AVAILABLE = True break else: if shutil.which('dot'): print("✓ Graphviz (dot) gefunden") GRAPHVIZ_AVAILABLE = True if not GRAPHVIZ_AVAILABLE: print("\n⚠️ Graphviz nicht gefunden. Installieren für Automaten-Visualisierung:") print(" Windows: https://graphviz.org/download/") print(" Linux: sudo apt-get install graphviz") print(" Mac: brew install graphviz") return GRAPHVIZ_AVAILABLE GRAPHVIZ_AVAILABLE = setup_graphviz() # ============================================================================ # IMPORTS AUS ARSXAI9 (wiederverwendung) # ============================================================================ # Hinweis: Für die Ausführung muss ARSXAI9.py im gleichen Verzeichnis sein! try: from ARSXAI9 import ( PlotThread, XAIModel, ARS20, GrammarInducer, MethodologicalReflection, ARSHiddenMarkovModel, ARSCRFModel, ARSPetriNet, ChainGenerator, XAIModelManager, DataValidator, DerivationVisualizer, NaturalLanguageExplainer, MultiFormatExporter, ARSXAI9GUI, MODULE_STATUS, GRAPHVIZ_AVAILABLE as BASE_GRAPHVIZ ) print("✓ ARSXAI9.py erfolgreich importiert") except ImportError as e: print(f"✗ Fehler beim Import von ARSXAI9.py: {e}") print(" Stellen Sie sicher, dass ARSXAI9.py im gleichen Verzeichnis ist!") sys.exit(1) # ============================================================================ # GLOBALE VARIABLEN FÜR OPTIONALE PAKETE # ============================================================================ # Diese müssen VOR ihrer ersten Verwendung deklariert werden! PREFIXSPAN_AVAILABLE = False SEMINFO_AVAILABLE = False # ============================================================================ # NEUE IMPORTS FÜR ARSXAI10 (mit erweiterter Installation) # ============================================================================ # PrefixSpan für große Daten (optional, mit Installationshilfe) try: from prefixspan import PrefixSpan PREFIXSPAN_AVAILABLE = True print("✓ PrefixSpan verfügbar - für große Datenmengen optimiert") except ImportError: PREFIXSPAN_AVAILABLE = False print("ℹ️ PrefixSpan nicht installiert (optional) - nutze Standard-Mustererkennung") print(" Für große Datenmengen (>1000 Ketten) Installation empfohlen:") print(" pip install prefixspan") # Biete automatische Installation an (optional) try: response = input(" PrefixSpan jetzt installieren? (j/n): ").lower() if response == 'j' or response == 'ja' or response == 'y' or response == 'yes': print(" Installiere PrefixSpan...") subprocess.check_call([sys.executable, "-m", "pip", "install", "prefixspan"]) from prefixspan import PrefixSpan PREFIXSPAN_AVAILABLE = True print(" ✓ PrefixSpan erfolgreich installiert!") except: print(" Installation übersprungen.") # Sentence-Transformers für semantische Namen (optional) try: from sentence_transformers import SentenceTransformer import numpy as np SEMINFO_AVAILABLE = True print("✓ Sentence-Transformers verfügbar - semantische Namen aktivierbar") except ImportError: SEMINFO_AVAILABLE = False print("ℹ️ Sentence-Transformers nicht installiert (optional)") print(" Für semantische Namen: pip install sentence-transformers") # ============================================================================ # TKINTER IMPORTS # ============================================================================ import tkinter as tk from tkinter import ttk, filedialog, messagebox, scrolledtext # ============================================================================ # WISSENSCHAFTLICHE BIBLIOTHEKEN # ============================================================================ import numpy as np from scipy.stats import pearsonr import matplotlib matplotlib.use('TkAgg') import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure # ============================================================================ # MDL-OPTIMIZER (Hilfsklasse für Minimum Description Length) # ============================================================================ class MDLOptimizer: """ Implementiert Minimum Description Length Prinzip. Bewertet Grammatiken nach Kompressionsrate. """ def __init__(self): self.compression_history = [] def calculate_compression_ratio(self, original_chains, grammar): """ Berechnet Kompressionsrate: 1 - (komprimierte_Länge / originale_Länge) Args: original_chains: Liste der ursprünglichen Ketten grammar: GrammarInducer-Objekt mit Regeln Returns: float: Kompressionsrate (0-1), höher ist besser """ if not grammar.rules: return 0.0 # Originale Länge (Anzahl der Terminalsymbole) original_length = sum(len(chain) for chain in original_chains) # Komprimierte Länge (Nonterminale zählen als 1) compressed_length = 0 for chain in original_chains: compressed = self._compress_chain(chain, grammar) compressed_length += len(compressed) ratio = 1 - (compressed_length / original_length) if original_length > 0 else 0 self.compression_history.append(ratio) return round(ratio, 3) def _compress_chain(self, chain, grammar, max_iter=10): """Wendet Grammatikregeln iterativ an, um Kette zu komprimieren""" current = list(chain) for _ in range(max_iter): changed = False # Suche nach anwendbaren Regeln (rückwärts, längste zuerst) for nt, productions in sorted(grammar.rules.items(), key=lambda x: -len(x[1][0][0] if x[1] else 0)): for prod, _ in productions: prod_len = len(prod) i = 0 while i <= len(current) - prod_len: if current[i:i+prod_len] == prod: current[i:i+prod_len] = [nt] changed = True break i += 1 if not changed: break return current def compare_grammars(self, grammar1, grammar2, chains): """ Vergleicht zwei Grammatiken nach MDL-Prinzip. Returns: dict: Vergleichsergebnisse mit Scores """ ratio1 = self.calculate_compression_ratio(chains, grammar1) ratio2 = self.calculate_compression_ratio(chains, grammar2) # Grammatikkomplexität (Anzahl der Regeln) complexity1 = len(grammar1.rules) if hasattr(grammar1, 'rules') else 0 complexity2 = len(grammar2.rules) if hasattr(grammar2, 'rules') else 0 # MDL-Score: Kompression - Komplexitätsstrafe mdl1 = ratio1 - (complexity1 * 0.01) # Einfache Strafe mdl2 = ratio2 - (complexity2 * 0.01) return { 'grammar1': { 'compression_ratio': ratio1, 'complexity': complexity1, 'mdl_score': mdl1 }, 'grammar2': { 'compression_ratio': ratio2, 'complexity': complexity2, 'mdl_score': mdl2 }, 'better': 'grammar1' if mdl1 > mdl2 else 'grammar2' if mdl2 > mdl1 else 'equal' } def optimal_cutoff(self, compression_gains): """ Findet natürliche Grenze für Iterationen (Elbow-Methode). Args: compression_gains: Liste der Kompressionsgewinne pro Iteration Returns: int: Optimale Anzahl von Iterationen """ if len(compression_gains) < 3: return len(compression_gains) # Einfache Elbow-Erkennung: Wo ist der Knick? gains = np.array(compression_gains) if len(gains) < 2: return len(gains) # Berechne Differenzen diffs = np.diff(gains) if len(diffs) < 1: return len(gains) # Finde ersten Punkt mit stark abnehmendem Gewinn threshold = np.mean(diffs) * 0.5 for i, diff in enumerate(diffs): if diff < threshold: return i + 1 return len(gains) def get_statistics_string(self): """Gibt Statistik der Kompression als String zurück""" if not self.compression_history: return "Keine Kompressionsdaten vorhanden." lines = [] lines.append("📊 **MDL-Kompressionsstatistik**") lines.append("=" * 50) for i, ratio in enumerate(self.compression_history): lines.append(f"Iteration {i+1}: {ratio:.1%} Kompression") optimal = self.optimal_cutoff(self.compression_history) lines.append(f"\n✅ Optimale Iterationen: {optimal}") return "\n".join(lines) # ============================================================================ # SEMINFO-MAXIMIZER (optional, für semantische Namen) # ============================================================================ class SemInfoMaximizer: """ Maximiert semantische Information der Nonterminale. Benötigt sentence-transformers. """ def __init__(self, model_name='paraphrase-multilingual-MiniLM-L12-v2'): self.model_name = model_name self.model = None self.embeddings = {} self.semantic_cache = {} if SEMINFO_AVAILABLE: try: self.model = SentenceTransformer(model_name) print(f"✓ SemInfo-Modell geladen: {model_name}") except Exception as e: print(f"✗ SemInfo-Modell konnte nicht geladen werden: {e}") def compute_embeddings(self, symbols): """ Erstellt Embeddings für eine Liste von Symbolen. Args: symbols: Liste von Strings (Symbole) Returns: dict: Symbol -> Embedding-Vektor """ if self.model is None: return {} # Filtere Symbole, die noch nicht im Cache sind to_compute = [s for s in symbols if s not in self.embeddings] if to_compute: try: # Generiere Embeddings für alle neuen Symbole new_embeddings = self.model.encode(to_compute) for sym, emb in zip(to_compute, new_embeddings): self.embeddings[sym] = emb except Exception as e: print(f"Fehler bei Embedding-Berechnung: {e}") return self.embeddings def semantic_coherence(self, sequence): """ Misst semantische Kohärenz einer Sequenz. Args: sequence: Liste von Symbolen Returns: float: Kohärenz-Score (0-1), höher = zusammenhängender """ if self.model is None or len(sequence) < 2: return 0.5 # Stelle sicher, dass Embeddings vorhanden sind self.compute_embeddings(sequence) # Berechne paarweise Ähnlichkeiten similarities = [] for i in range(len(sequence) - 1): sym1 = sequence[i] sym2 = sequence[i + 1] if sym1 in self.embeddings and sym2 in self.embeddings: emb1 = self.embeddings[sym1] emb2 = self.embeddings[sym2] # Kosinus-Ähnlichkeit sim = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2) + 1e-8) similarities.append(max(0, sim)) # Auf 0-1 beschränken if not similarities: return 0.5 return float(np.mean(similarities)) def suggest_name(self, sequence): """ Generiert einen semantischen Namen für eine Sequenz. Args: sequence: Liste von Symbolen Returns: str: Vorgeschlagener Name oder None """ if self.model is None or len(sequence) < 2: return None # Prüfe Cache cache_key = tuple(sequence) if cache_key in self.semantic_cache: return self.semantic_cache[cache_key] # Berechne durchschnittliches Embedding self.compute_embeddings(sequence) valid_embs = [self.embeddings[s] for s in sequence if s in self.embeddings] if not valid_embs: return None mean_emb = np.mean(valid_embs, axis=0) # Hier könnte man eine Suche nach ähnlichen Konzepten implementieren # Für jetzt: Generische Namen basierend auf Kohärenz coherence = self.semantic_coherence(sequence) if coherence > 0.8: name = "KOHÄRENT" elif coherence > 0.6: name = "ZUSAMMENHÄNGEND" elif coherence > 0.4: name = "GEMISCHT" else: name = "DIVERGENT" result = f"{name}_{len(sequence)}" self.semantic_cache[cache_key] = result return result def get_status_string(self): """Gibt Status des SemInfo-Maximizers zurück""" lines = [] lines.append("🧠 **SemInfo-Maximizer Status**") lines.append("=" * 50) lines.append(f"Modell: {self.model_name}") lines.append(f"Verfügbar: {'✓' if self.model else '✗'}") lines.append(f"Gecachte Embeddings: {len(self.embeddings)}") lines.append(f"Semantische Namen im Cache: {len(self.semantic_cache)}") return "\n".join(lines) # ============================================================================ # DEPTH-BOUNDED GRAMMAR INDUCER (KERN DER ERWEITERUNG) # ============================================================================ class DepthBoundedGrammarInducer(GrammarInducer): """ Erweitert GrammarInducer um Tiefenbeschränkung und MDL-Optimierung. Attribute: max_depth: Maximale Hierarchietiefe (default 5) use_mdl: MDL-Optimierung aktivieren (default True) use_prefixspan: PrefixSpan für große Daten (default False) depth_map: nonterminal -> Tiefe mdl_optimizer: MDLOptimizer-Instanz seminfo: SemInfoMaximizer-Instanz (optional) """ def __init__(self, max_depth=5, use_mdl=True, use_prefixspan=False, use_seminfo=False): super().__init__() self.max_depth = max_depth self.use_mdl = use_mdl self.use_prefixspan = use_prefixspan and PREFIXSPAN_AVAILABLE self.use_seminfo = use_seminfo and SEMINFO_AVAILABLE self.depth_map = {} # nonterminal -> Tiefe self.mdl_scores = {} # für Optimierung self.mdl_optimizer = MDLOptimizer() self.seminfo = SemInfoMaximizer() if use_seminfo and SEMINFO_AVAILABLE else None self.compression_gains = [] # Für Cutoff-Erkennung def train(self, chains, max_iterations=20): """Überschriebene Train-Methode mit Tiefenbeschränkung""" self.chains = [list(chain) for chain in chains] # Sammle alle Terminale all_symbols = set() for chain in chains: for symbol in chain: all_symbols.add(symbol) self.terminals = all_symbols current_chains = [list(chain) for chain in chains] iteration = 0 rule_counter = 1 self.rules = {} self.nonterminals = set() self.symbol_to_nonterminals = defaultdict(set) self.compression_history = [] self.hierarchy_levels = {} self.depth_map = {} self.mdl_scores = {} self.compression_gains = [] while iteration < max_iterations: best_seq = self._find_best_repetition(current_chains) if best_seq is None: break # Prüfe Tiefenbeschränkung depth = self._estimate_depth(best_seq) if depth > self.max_depth: # Überspringe zu tiefe Muster self._mark_as_skipped(best_seq) continue new_nonterminal = self._generate_nonterminal_name(best_seq, depth) base_name = new_nonterminal while new_nonterminal in self.nonterminals: new_nonterminal = f"{base_name}_{rule_counter}" rule_counter += 1 rationale = self._generate_rationale(best_seq) self.reflection.log_interpretation(best_seq, new_nonterminal, rationale) self.rules[new_nonterminal] = [(list(best_seq), 1.0)] self.nonterminals.add(new_nonterminal) self.hierarchy_levels[new_nonterminal] = iteration self.depth_map[new_nonterminal] = depth # Aktualisiere symbol_to_nonterminals for symbol in best_seq: self.symbol_to_nonterminals[symbol].add(new_nonterminal) # Zähle Vorkommen occurrences = 0 for chain in current_chains: for i in range(len(chain) - len(best_seq) + 1): if tuple(chain[i:i+len(best_seq)]) == best_seq: occurrences += 1 self.compression_history.append({ 'iteration': iteration, 'sequence': best_seq, 'new_symbol': new_nonterminal, 'occurrences': occurrences, 'depth': depth }) # MDL-Gain berechnen if self.use_mdl: gain = self._mdl_gain(best_seq, occurrences) self.mdl_scores[new_nonterminal] = gain self.compression_gains.append(gain) current_chains = self._compress_sequences(current_chains, best_seq, new_nonterminal) iteration += 1 self.iteration_count = iteration if self._all_chains_identical(current_chains): if current_chains and current_chains[0]: unique_symbol = current_chains[0][0] if unique_symbol in self.rules: self.start_symbol = unique_symbol else: self.start_symbol = self._find_top_level_nonterminal() break if self.start_symbol is None: self.start_symbol = self._find_top_level_nonterminal() # Terminale aktualisieren all_symbols = set() for chain in self.chains: for sym in chain: all_symbols.add(sym) self.terminals = all_symbols - self.nonterminals # Berechne Wahrscheinlichkeiten self._calculate_probabilities() self.induction_done = True self.trained = True self.confidence = self._calculate_confidence() return current_chains def _find_best_repetition(self, chains): """MDL-optimierte Musterauswahl""" # PrefixSpan für große Daten if self.use_prefixspan and len(chains) > 1000: return self._find_with_prefixspan(chains) # Standard-Mustererkennung (von GrammarInducer) sequence_counter = Counter() for chain in chains: max_len = min(5, len(chain)) # Begrenze Länge für Performance for length in range(2, max_len + 1): for i in range(len(chain) - length + 1): seq = tuple(chain[i:i+length]) sequence_counter[seq] += 1 repeated = {seq: count for seq, count in sequence_counter.items() if count >= 2} if not repeated: return None if self.use_mdl: # MDL-basierte Bewertung best_score = -float('inf') best_seq = None for seq, count in repeated.items(): gain = self._mdl_gain(seq, count) if gain > best_score: best_score = gain best_seq = seq return best_seq else: # Standard-Bewertung (wie GrammarInducer) return max(repeated.items(), key=lambda x: x[1] * len(x[0]) / max(1, len(set(x[0]))))[0] def _mdl_gain(self, sequence, count): """ Berechnet MDL-Ersparnis (Kompression). Args: sequence: Tuple von Symbolen count: Anzahl der Vorkommen Returns: float: Ersparnis (positiv = lohnend) """ # Kosten ohne Kompression: jedes Vorkommen zählt als Länge original_cost = len(sequence) * count # Kosten mit Kompression: 1 (Nonterminal) pro Vorkommen + Definition compressed_cost = count + len(sequence) gain = original_cost - compressed_cost return gain / (original_cost + 1) # Normalisiert def _find_with_prefixspan(self, chains, min_support=2): """Findet Muster mit PrefixSpan (für große Daten)""" if not PREFIXSPAN_AVAILABLE: return None try: ps = PrefixSpan(chains) patterns = ps.frequent(min_support) # Filtere Muster mit Länge >= 2 valid_patterns = [(seq, support) for seq, support in patterns if len(seq) >= 2] if not valid_patterns: return None # Bestes Muster nach MDL oder Länge if self.use_mdl: best = max(valid_patterns, key=lambda x: self._mdl_gain(tuple(x[0]), x[1])) return tuple(best[0]) else: best = max(valid_patterns, key=lambda x: len(x[0]) * x[1]) return tuple(best[0]) except Exception as e: print(f"PrefixSpan-Fehler: {e}") return None def _generate_nonterminal_name(self, sequence, depth): """Generiert Namen mit Tiefeninformation und optional semantisch""" first = sequence[0] if sequence else "X" last = sequence[-1] if sequence else "X" # Semantischer Name (optional) if self.use_seminfo and self.seminfo: semantic = self.seminfo.suggest_name(sequence) if semantic: return f"{semantic}_d{depth}" return f"P_{first}_{last}_{len(sequence)}_d{depth}" def _estimate_depth(self, sequence): """ Schätzt benötigte Tiefe für eine Sequenz. Berücksichtigt, ob Symbole bereits Nonterminale sind. """ max_depth = 0 for sym in sequence: if sym in self.depth_map: max_depth = max(max_depth, self.depth_map[sym] + 1) return max_depth def _mark_as_skipped(self, sequence): """Markiert ein Muster als übersprungen (zu tief)""" if not hasattr(self, 'skipped_patterns'): self.skipped_patterns = [] self.skipped_patterns.append({ 'sequence': sequence, 'depth': self._estimate_depth(sequence) }) def get_depth_statistics(self): """Gibt Tiefenstatistik aus""" lines = [] lines.append("📊 **TIEFENSTATISTIK**") lines.append("=" * 60) if not self.depth_map: lines.append("Keine Tiefeninformationen verfügbar.") return "\n".join(lines) # Verteilung der Tiefen depth_counts = Counter(self.depth_map.values()) lines.append("\nTiefenverteilung:") for depth in sorted(depth_counts.keys()): count = depth_counts[depth] percentage = (count / len(self.depth_map)) * 100 lines.append(f" Tiefe {depth}: {count} Nonterminale ({percentage:.1f}%)") # Nonterminale nach Tiefe lines.append("\nNonterminale nach Tiefe:") for depth in sorted(set(self.depth_map.values())): nts = [nt for nt, d in self.depth_map.items() if d == depth] lines.append(f" Tiefe {depth}: {', '.join(nts[:5])}" + (f" ... und {len(nts)-5} weitere" if len(nts) > 5 else "")) # Übersprungene Muster if hasattr(self, 'skipped_patterns') and self.skipped_patterns: lines.append(f"\n⚠️ Übersprungene Muster (Tiefe > {self.max_depth}):") for pattern in self.skipped_patterns[:5]: seq_str = ' → '.join(pattern['sequence']) lines.append(f" • {seq_str} (Tiefe {pattern['depth']})") return "\n".join(lines) def get_mdl_statistics(self): """Gibt MDL-Statistik aus""" return self.mdl_optimizer.get_statistics_string() def compare_with_standard(self, standard_grammar, chains): """Vergleicht diese Grammatik mit der Standard-Grammatik""" return self.mdl_optimizer.compare_grammars(self, standard_grammar, chains) # ============================================================================ # ERWEITERTE GUI (erbt von ARSXAI9GUI) # ============================================================================ class ARSXAI10GUI(ARSXAI9GUI): """ Erweiterte GUI mit neuen Optionen für Depth-Bounded PCFG und MDL. Alle bestehenden Tabs und Funktionen bleiben erhalten! """ def __init__(self, root): # Initialisiere erweiterte Parameter self.use_depth_bounded = tk.BooleanVar(value=True) self.max_depth = tk.IntVar(value=5) self.use_mdl = tk.BooleanVar(value=True) self.use_prefixspan = tk.BooleanVar(value=False) self.use_seminfo = tk.BooleanVar(value=False) # Rufe Elternkonstruktor auf super().__init__(root) # Füge erweiterte Tabs und Parameter hinzu self._add_advanced_induction_tab() def _register_models(self): """Erweiterte Modell-Registrierung""" super()._register_models() # Behalte alle alten Modelle # Füge Depth-Bounded Modell hinzu depth_model = DepthBoundedGrammarInducer( max_depth=self.max_depth.get(), use_mdl=self.use_mdl.get(), use_prefixspan=self.use_prefixspan.get() and PREFIXSPAN_AVAILABLE, use_seminfo=self.use_seminfo.get() ) self.model_manager.register_model('DepthBoundedPCFG', depth_model) self.model_manager.activate_model('DepthBoundedPCFG') def _add_advanced_induction_tab(self): """Fügt neuen Tab für erweiterte Induktion hinzu""" self.tab_advanced = ttk.Frame(self.notebook) self.notebook.add(self.tab_advanced, text="Erweiterte Induktion") # Parameter-Frame param_frame = ttk.LabelFrame(self.tab_advanced, text="Parameter") param_frame.pack(fill=tk.X, padx=10, pady=10) # Tiefen-Regler depth_frame = ttk.Frame(param_frame) depth_frame.pack(fill=tk.X, pady=5) ttk.Label(depth_frame, text="Maximale Tiefe:").pack(side=tk.LEFT) self.depth_slider = ttk.Scale(depth_frame, from_=1, to=10, orient=tk.HORIZONTAL, variable=self.max_depth) self.depth_slider.pack(side=tk.LEFT, padx=10, fill=tk.X, expand=True) ttk.Label(depth_frame, textvariable=self.max_depth).pack(side=tk.LEFT, padx=5) # Checkboxen ttk.Checkbutton(param_frame, text="MDL-Optimierung", variable=self.use_mdl).pack(anchor=tk.W, pady=2) prefix_frame = ttk.Frame(param_frame) prefix_frame.pack(fill=tk.X, pady=2) self.prefix_check = ttk.Checkbutton(prefix_frame, text="PrefixSpan verwenden", variable=self.use_prefixspan) self.prefix_check.pack(side=tk.LEFT) if not PREFIXSPAN_AVAILABLE: ttk.Label(prefix_frame, text="(nicht verfügbar - pip install prefixspan)", foreground="orange").pack(side=tk.LEFT, padx=5) self.use_prefixspan.set(False) self.prefix_check.config(state='disabled') seminfo_frame = ttk.Frame(param_frame) seminfo_frame.pack(fill=tk.X, pady=2) self.seminfo_check = ttk.Checkbutton(seminfo_frame, text="Semantische Namen (SemInfo)", variable=self.use_seminfo) self.seminfo_check.pack(side=tk.LEFT) if not SEMINFO_AVAILABLE: ttk.Label(seminfo_frame, text="(sentence-transformers fehlt)", foreground="orange").pack(side=tk.LEFT, padx=5) self.use_seminfo.set(False) self.seminfo_check.config(state='disabled') # Aktions-Buttons action_frame = ttk.Frame(self.tab_advanced) action_frame.pack(fill=tk.X, padx=10, pady=10) ttk.Button(action_frame, text="Tiefenstatistik anzeigen", command=self.show_depth_statistics).pack(side=tk.LEFT, padx=5) ttk.Button(action_frame, text="MDL-Statistik anzeigen", command=self.show_mdl_statistics).pack(side=tk.LEFT, padx=5) ttk.Button(action_frame, text="Mit Standard vergleichen", command=self.compare_with_standard).pack(side=tk.LEFT, padx=5) ttk.Button(action_frame, text="PrefixSpan installieren", command=self.install_prefixspan).pack(side=tk.LEFT, padx=5) # Ausgabebereich output_frame = ttk.LabelFrame(self.tab_advanced, text="Ausgabe") output_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) self.text_advanced = scrolledtext.ScrolledText(output_frame, font=('Courier', 10)) self.text_advanced.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) def install_prefixspan(self): """Installiert PrefixSpan on-demand""" # Hier muss PREFIXSPAN_AVAILABLE als global deklariert werden, # weil wir es ändern wollen! global PREFIXSPAN_AVAILABLE if PREFIXSPAN_AVAILABLE: messagebox.showinfo("Info", "PrefixSpan ist bereits installiert!") return result = messagebox.askyesno("Installation", "PrefixSpan wird für große Datenmengen empfohlen.\n\n" "Jetzt installieren?") if result: try: self.text_advanced.insert(tk.END, "Installiere PrefixSpan...\n") self.root.update() subprocess.check_call([sys.executable, "-m", "pip", "install", "prefixspan"]) # Versuche erneut zu importieren from prefixspan import PrefixSpan PREFIXSPAN_AVAILABLE = True self.text_advanced.insert(tk.END, "✓ PrefixSpan erfolgreich installiert!\n") self.prefix_check.config(state='normal') messagebox.showinfo("Erfolg", "PrefixSpan wurde erfolgreich installiert!") except Exception as e: self.text_advanced.insert(tk.END, f"✗ Fehler bei Installation: {e}\n") messagebox.showerror("Fehler", f"Installation fehlgeschlagen:\n{e}") def run_grammar_induction(self): """Führt Grammatikinduktion durch (mit neuen Optionen)""" if not self.chains: messagebox.showerror("Fehler", "Keine Daten geladen!") return self.status_var.set("Induziere Grammatik...") self.progress_bar.start() def run(): try: # Wähle Inducer basierend auf Einstellungen if self.use_depth_bounded.get(): inducer = DepthBoundedGrammarInducer( max_depth=self.max_depth.get(), use_mdl=self.use_mdl.get(), use_prefixspan=self.use_prefixspan.get() and len(self.chains) > 1000, use_seminfo=self.use_seminfo.get() ) else: inducer = GrammarInducer() inducer.train(self.chains) self.grammar = inducer self.explainer = NaturalLanguageExplainer(self.grammar) def update(): self.show_grammar() self.show_patterns() self.status_var.set(f"Grammatik induziert: {len(self.grammar.nonterminals)} Muster gefunden") self.progress_bar.stop() # Zeige Erfolgsmeldung im Advanced-Tab if hasattr(self, 'text_advanced'): self.text_advanced.delete("1.0", tk.END) self.text_advanced.insert(tk.END, "✅ Grammatikinduktion abgeschlossen.\n\n") if isinstance(inducer, DepthBoundedGrammarInducer): self.text_advanced.insert(tk.END, f"Max. Tiefe: {self.max_depth.get()}\n") self.text_advanced.insert(tk.END, f"MDL-Optimierung: {'aktiv' if self.use_mdl.get() else 'inaktiv'}\n") self.text_advanced.insert(tk.END, f"PrefixSpan: {'aktiv' if self.use_prefixspan.get() and PREFIXSPAN_AVAILABLE else 'inaktiv'}\n") if self.use_seminfo.get() and SEMINFO_AVAILABLE: self.text_advanced.insert(tk.END, inducer.seminfo.get_status_string() + "\n") self.safe_gui_update(update) except Exception as e: def error(): messagebox.showerror("Fehler", f"Grammatikinduktion fehlgeschlagen:\n{str(e)}") self.progress_bar.stop() self.safe_gui_update(error) thread = threading.Thread(target=run) thread.daemon = True thread.start() def show_depth_statistics(self): """Zeigt Tiefenstatistik im Advanced-Tab an""" if not hasattr(self, 'grammar') or not isinstance(self.grammar, DepthBoundedGrammarInducer): messagebox.showerror("Fehler", "Keine Depth-Bounded Grammatik vorhanden!") return stats = self.grammar.get_depth_statistics() self.text_advanced.delete("1.0", tk.END) self.text_advanced.insert(tk.END, stats) def show_mdl_statistics(self): """Zeigt MDL-Statistik im Advanced-Tab an""" if not hasattr(self, 'grammar') or not isinstance(self.grammar, DepthBoundedGrammarInducer): messagebox.showerror("Fehler", "Keine Depth-Bounded Grammatik vorhanden!") return stats = self.grammar.get_mdl_statistics() self.text_advanced.delete("1.0", tk.END) self.text_advanced.insert(tk.END, stats) def compare_with_standard(self): """Vergleicht Depth-Bounded mit Standard-Grammatik""" if not hasattr(self, 'grammar') or not isinstance(self.grammar, DepthBoundedGrammarInducer): messagebox.showerror("Fehler", "Keine Depth-Bounded Grammatik vorhanden!") return # Erstelle Standard-Grammatik zum Vergleich standard = GrammarInducer() standard.train(self.chains) comparison = self.grammar.compare_with_standard(standard, self.chains) self.text_advanced.delete("1.0", tk.END) self.text_advanced.insert(tk.END, "📊 **MDL-VERGLEICH**\n") self.text_advanced.insert(tk.END, "=" * 60 + "\n\n") self.text_advanced.insert(tk.END, "**Depth-Bounded Grammatik:**\n") self.text_advanced.insert(tk.END, f" Kompression: {comparison['grammar1']['compression_ratio']:.1%}\n") self.text_advanced.insert(tk.END, f" Komplexität: {comparison['grammar1']['complexity']} Regeln\n") self.text_advanced.insert(tk.END, f" MDL-Score: {comparison['grammar1']['mdl_score']:.3f}\n\n") self.text_advanced.insert(tk.END, "**Standard Grammatik:**\n") self.text_advanced.insert(tk.END, f" Kompression: {comparison['grammar2']['compression_ratio']:.1%}\n") self.text_advanced.insert(tk.END, f" Komplexität: {comparison['grammar2']['complexity']} Regeln\n") self.text_advanced.insert(tk.END, f" MDL-Score: {comparison['grammar2']['mdl_score']:.3f}\n\n") winner = "Depth-Bounded" if comparison['better'] == 'grammar1' else "Standard" if comparison['better'] == 'grammar2' else "beide gleich" self.text_advanced.insert(tk.END, f"🏆 **Besser: {winner}**\n") # ============================================================================ # EXPORT-ERWEITERUNGEN # ============================================================================ class ExtendedExporter(MultiFormatExporter): """Erweiterter Exporter mit Tiefenstatistik und MDL-Scores""" def to_json(self, data, filename=None): """Erweiterter JSON-Export mit Tiefeninfo""" if 'grammar' in data and hasattr(data['grammar'], 'get'): # Füge Tiefenstatistik hinzu, wenn vorhanden if 'depth_bounded' in data and data['depth_bounded']: data['depth_statistics'] = { 'max_depth': data.get('max_depth', 5), 'depth_distribution': dict(Counter(data['depth_map'].values())) if 'depth_map' in data else {} } return super().to_json(data, filename) def to_html(self, data, filename=None): """Erweiterter HTML-Export mit MDL-Scores""" filepath = super().to_html(data, filename) # Hier könnte man nachträglich MDL-Info einfügen # Für jetzt: Standard-HTML reicht return filepath # ============================================================================ # MODULSTATUS AKTUALISIEREN # ============================================================================ MODULE_STATUS['prefixspan'] = PREFIXSPAN_AVAILABLE MODULE_STATUS['seminfo'] = SEMINFO_AVAILABLE # ============================================================================ # HAUPTFUNKTION # ============================================================================ def main(): """Hauptfunktion""" print("\n" + "=" * 70) print("ARSXAI10 - ERWEITERTE GRAMMATIKINDUKTION GESTARTET") print("=" * 70) print(f"PrefixSpan verfügbar: {'✓' if PREFIXSPAN_AVAILABLE else '✗'}") print(f"SemInfo verfügbar: {'✓' if SEMINFO_AVAILABLE else '✗'}") print("=" * 70 + "\n") root = tk.Tk() app = ARSXAI10GUI(root) root.mainloop() if __name__ == "__main__": main()