""" ARS GUI - Algorithmic Recursive Sequence Analysis with Graphical User Interface Erweiterte Version mit formalem Entscheidungsautomaten, 5-Bit-Kodierung und statistischer Analyse empirischer Abweichungen Neue Funktionen: - 5-Bit-Kodierung der Terminalzeichen - Deterministischer endlicher Automat zur Wohlgeformtheitsprüfung - Statistische Analyse von Übergangswahrscheinlichkeiten (Terminal- und Phasenebene) - Erkennung von Schleifen und Wiederholungen - Dokumentation fehlender Elemente (Begrüßung, Verabschiedung) - Explizite Trennung von struktureller Validierung und statistischer Analyse """ import sys import subprocess import importlib import warnings warnings.filterwarnings("ignore", category=DeprecationWarning) # ============================================================================ # PAKETVERWALTUNG # ============================================================================ def check_and_install_packages(): """Prüft und installiert fehlende Python-Pakete""" REQUIRED_PACKAGES = [ 'numpy', 'scipy', 'matplotlib', 'hmmlearn', 'sklearn-crfsuite', 'sentence-transformers', 'networkx', 'torch', 'seaborn', 'tabulate' ] print("=" * 70) print("ARS 4.0 - PAKETPRÜFUNG") print("=" * 70) missing_packages = [] for package in REQUIRED_PACKAGES: import_name = package.replace('-', '_') if package == 'sklearn-crfsuite': import_name = 'sklearn_crfsuite' elif package == 'sentence-transformers': import_name = 'sentence_transformers' try: importlib.import_module(import_name) print(f"✓ {package} bereits installiert") except ImportError: print(f"✗ {package} fehlt") missing_packages.append(package) if missing_packages: print("\nInstalliere fehlende Pakete...") for package in missing_packages: try: subprocess.check_call([sys.executable, "-m", "pip", "install", package]) print(f" ✓ {package} erfolgreich installiert") except subprocess.CalledProcessError as e: print(f" ✗ Fehler bei Installation von {package}: {e}") print(f" Bitte manuell installieren: pip install {package}") print("\n" + "=" * 70 + "\n") check_and_install_packages() # ============================================================================ # IMPORTS # ============================================================================ import tkinter as tk from tkinter import ttk, filedialog, messagebox, scrolledtext import numpy as np from scipy.stats import pearsonr import matplotlib matplotlib.use('TkAgg') import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from collections import Counter, defaultdict import threading import re import queue import json from datetime import datetime # Optionale Imports mit Fallbacks try: import networkx as nx NETWORKX_AVAILABLE = True except ImportError: NETWORKX_AVAILABLE = False try: from hmmlearn import hmm HMM_AVAILABLE = True except ImportError: HMM_AVAILABLE = False try: from sklearn_crfsuite import CRF CRF_AVAILABLE = True except ImportError: CRF_AVAILABLE = False try: from sentence_transformers import SentenceTransformer TRANSFORMER_AVAILABLE = True except ImportError: TRANSFORMER_AVAILABLE = False try: import seaborn as sns SEABORN_AVAILABLE = True except ImportError: SEABORN_AVAILABLE = False # ============================================================================ # THREAD-SICHERE PLOT-FUNKTIONEN # ============================================================================ class PlotThread: """Thread-sichere Plot-Ausführung""" def __init__(self, root): self.root = root self.plot_queue = queue.Queue() self.process() def process(self): try: while True: func, args, kwargs = self.plot_queue.get_nowait() self.root.after(0, lambda: func(*args, **kwargs)) except queue.Empty: pass finally: self.root.after(100, self.process) def plot(self, func, *args, **kwargs): self.plot_queue.put((func, args, kwargs)) # ============================================================================ # 5-BIT-KODIERUNG DER TERMINALZEICHEN # ============================================================================ class TerminalCoding: """ 5-Bit-Kodierung der Terminalzeichen nach dem Schema: [S][P1P2][U1U2] S: 0 = Kunde, 1 = Verkäufer P1P2: 00 = BG, 01 = B, 10 = A, 11 = AV U1U2: 00 = Basis, 01 = Folge """ # Mapping von Symbolen auf 5-Bit-Codes SYMBOL_TO_CODE = { 'KBG': '00000', 'VBG': '10000', 'KBBd': '00100', 'VBBd': '10100', 'KBA': '00101', 'VBA': '10101', 'KAE': '01000', 'VAE': '11000', 'KAA': '01001', 'VAA': '11001', 'KAV': '01100', 'VAV': '11100' } # Rückwärts-Mapping für Anzeige CODE_TO_SYMBOL = {v: k for k, v in SYMBOL_TO_CODE.items()} # Phasennamen für Ausgabe PHASE_NAMES = { '00': 'BG (Begrüßung)', '01': 'B (Bedarf)', '10': 'A (Abschluss)', '11': 'AV (Verabschiedung)' } @classmethod def encode(cls, symbol): """Wandelt ein Symbol in seinen 5-Bit-Code um""" return cls.SYMBOL_TO_CODE.get(symbol, None) @classmethod def decode(cls, code): """Wandelt einen 5-Bit-Code zurück in das Symbol""" return cls.CODE_TO_SYMBOL.get(code, code) @classmethod def encode_chain(cls, chain): """Wandelt eine ganze Kette in 5-Bit-Codes um""" encoded = [] for sym in chain: code = cls.encode(sym) if code: encoded.append(code) else: encoded.append(sym) # Fallback für unbekannte Symbole return encoded @classmethod def decode_chain(cls, coded_chain): """Wandelt eine kodierte Kette zurück in Symbole""" decoded = [] for code in coded_chain: if len(code) == 5 and all(c in '01' for c in code): sym = cls.decode(code) decoded.append(sym) else: decoded.append(code) return decoded @classmethod def get_phase(cls, code): """Extrahiert die Phase (Bits 2-3) aus einem Code""" if len(code) >= 3: return code[1:3] return None @classmethod def get_speaker(cls, code): """Extrahiert den Sprecher (Bit 1) aus einem Code""" if len(code) >= 1: return 'Kunde' if code[0] == '0' else 'Verkäufer' return None @classmethod def get_phase_name(cls, code): """Gibt den lesbaren Phasennamen zurück""" phase = cls.get_phase(code) return cls.PHASE_NAMES.get(phase, phase) # ============================================================================ # FORMALER ENTSCHEIDUNGSAUTOMAT # ============================================================================ class DialogueAutomaton: """ Deterministischer endlicher Automat zur Prüfung der Wohlgeformtheit von Dialogsequenzen basierend auf der 5-Bit-Kodierung. """ # Zustände Q0 = 'q0' # Start Q_BG = 'q_BG' # Begrüßung Q_B = 'q_B' # Bedarf Q_A = 'q_A' # Abschluss Q_AV = 'q_AV' # Verabschiedung Q_ERR = 'q_err' # Fehler # Akzeptierende Zustände ACCEPTING = {Q_AV} # Zustandsnamen für Ausgabe STATE_NAMES = { Q0: 'Start', Q_BG: 'Begrüßung', Q_B: 'Bedarf', Q_A: 'Abschluss', Q_AV: 'Verabschiedung', Q_ERR: 'Fehler' } def __init__(self): self.current_state = self.Q0 self.history = [] self.reset() def reset(self): """Setzt den Automaten in den Startzustand zurück""" self.current_state = self.Q0 self.history = [(self.Q0, None, 'Initialisierung')] def get_state_name(self, state): """Gibt den lesbaren Namen eines Zustands zurück""" return self.STATE_NAMES.get(state, state) def transition(self, code): """ Führt einen Übergang basierend auf dem 5-Bit-Code durch. Gibt (neuer_zustand, akzeptiert, erklärung) zurück. """ state = self.current_state # Prüfe, ob Code gültig ist if len(code) != 5 or not all(c in '01' for c in code): self.current_state = self.Q_ERR explanation = f"Ungültiger Code: {code}" self.history.append((self.current_state, code, explanation)) return self.current_state, False, explanation # Sprecherbit extrahieren speaker = 'Kunde' if code[0] == '0' else 'Verkäufer' phase_bits = code[1:3] sub_bits = code[3:5] # Phasenbestimmung phase_map = {'00': 'BG', '01': 'B', '10': 'A', '11': 'AV'} phase = phase_map.get(phase_bits, 'UNBEKANNT') # Übergangstabelle if state == self.Q0: if code == '00000': # KBG self.current_state = self.Q_BG explanation = f"Start → Begrüßung: {speaker} eröffnet Gespräch" else: self.current_state = self.Q_ERR explanation = f"Start: Erwarte KBG (00000), erhielt {code}" elif state == self.Q_BG: if code == '10000': # VBG self.current_state = self.Q_BG explanation = f"Begrüßung fortgesetzt: {speaker} erwidert Gruß" elif code == '00100': # KBBd self.current_state = self.Q_B explanation = f"Begrüßung → Bedarf: {speaker} äußert Bedarf" else: self.current_state = self.Q_ERR explanation = f"Begrüßung: Unerwartetes Symbol {code}" elif state == self.Q_B: if code in ['00100', '10100', '00101', '10101']: # KBBd, VBBd, KBA, VBA self.current_state = self.Q_B explanation = f"Bedarf fortgesetzt: {speaker} in Phase {phase}" elif code == '01000': # KAE self.current_state = self.Q_A explanation = f"Bedarf → Abschluss: {speaker} leitet Abschluss ein" else: self.current_state = self.Q_ERR explanation = f"Bedarf: Unerwartetes Symbol {code}" elif state == self.Q_A: if code in ['01000', '11000']: # KAE, VAE self.current_state = self.Q_A explanation = f"Abschluss fortgesetzt: {speaker} in Phase {phase}" elif code == '01001': # KAA self.current_state = self.Q_AV explanation = f"Abschluss → Verabschiedung: {speaker} schließt ab" else: self.current_state = self.Q_ERR explanation = f"Abschluss: Unerwartetes Symbol {code}" elif state == self.Q_AV: if code in ['01100', '11100', '11001']: # KAV, VAV, VAA self.current_state = self.Q_AV explanation = f"Verabschiedung: {speaker} in Phase {phase}" else: self.current_state = self.Q_ERR explanation = f"Verabschiedung: Unerwartetes Symbol {code}" else: # Fehlerzustand self.current_state = self.Q_ERR explanation = f"Bereits im Fehlerzustand" self.history.append((self.current_state, code, explanation)) is_accepting = self.current_state in self.ACCEPTING return self.current_state, is_accepting, explanation def validate_chain(self, coded_chain): """ Validiert eine ganze kodierte Kette. Gibt (gültig, letzter_zustand, protokoll) zurück. """ self.reset() protocol = [] for i, code in enumerate(coded_chain): state, accepting, explanation = self.transition(code) protocol.append({ 'position': i + 1, 'code': code, 'symbol': TerminalCoding.decode(code), 'phase': TerminalCoding.get_phase_name(code), 'speaker': TerminalCoding.get_speaker(code), 'state': self.get_state_name(state), 'explanation': explanation, 'is_accepting': accepting }) valid = self.current_state in self.ACCEPTING return valid, self.get_state_name(self.current_state), protocol def get_history_string(self): """Gibt den gesamten Entscheidungspfad als String zurück""" lines = [] for i, (state, code, explanation) in enumerate(self.history): if i == 0: lines.append(f"Start: {self.get_state_name(state)}") else: sym = TerminalCoding.decode(code) if code else "-" phase = TerminalCoding.get_phase_name(code) if code else "-" lines.append(f" {i}. {code} ({sym}, {phase}) → {self.get_state_name(state)}") lines.append(f" {explanation}") return "\n".join(lines) # ============================================================================ # STATISTISCHE ERWEITERUNG # ============================================================================ class StatisticalExtension: """ Statistische Erweiterung des formalen Automaten. Trennt strikt zwischen struktureller Validierung und statistischer Analyse. """ def __init__(self, automaton): self.automaton = automaton self.terminal_transitions = defaultdict(Counter) self.phase_transitions = defaultdict(Counter) self.terminal_counts = defaultdict(int) self.phase_counts = defaultdict(int) self.loops = [] self.missing_elements = defaultdict(int) self.sequence_stats = [] self.transcript_results = [] def analyze_coded_chain(self, coded_chain, transcript_id): """ Analysiert eine kodierte Kette statistisch, ohne die strukturelle Validierung zu beeinflussen. """ # 1. Strukturelle Validierung (unabhängig) valid, state, protocol = self.automaton.validate_chain(coded_chain) # 2. Statistische Analyse (nachgelagert) self._count_transitions(coded_chain) self._count_phases(coded_chain) self._detect_loops(coded_chain, transcript_id) self._check_missing_elements(coded_chain) result = { 'transcript_id': transcript_id, 'valid': valid, 'state': state, 'length': len(coded_chain), 'protocol': protocol } self.transcript_results.append(result) return result def _count_transitions(self, coded_chain): """Zählt Übergänge zwischen Terminalzeichen""" for i in range(len(coded_chain) - 1): curr = coded_chain[i] next_sym = coded_chain[i + 1] self.terminal_transitions[curr][next_sym] += 1 self.terminal_counts[curr] += 1 def _count_phases(self, coded_chain): """Zählt Übergänge zwischen Phasen (Bits 2-3)""" phases = [code[1:3] for code in coded_chain] for i in range(len(phases) - 1): curr_phase = phases[i] next_phase = phases[i + 1] self.phase_transitions[curr_phase][next_phase] += 1 self.phase_counts[curr_phase] += 1 def _detect_loops(self, coded_chain, transcript_id): """Erkennt Schleifen in der Sequenz""" # Einfache Schleifenerkennung: sich wiederholende Muster for length in range(2, min(5, len(coded_chain) // 2 + 1)): for i in range(len(coded_chain) - 2 * length + 1): pattern = coded_chain[i:i+length] if coded_chain[i+length:i+2*length] == pattern: # Prüfe, ob dieses Muster bereits erfasst wurde pattern_tuple = tuple(pattern) existing = False for loop in self.loops: if loop['pattern'] == pattern_tuple: existing = True loop['occurrences'] += 1 if transcript_id not in loop['transcripts']: loop['transcripts'].append(transcript_id) break if not existing: self.loops.append({ 'position': i, 'length': length, 'pattern': pattern_tuple, 'transcripts': [transcript_id], 'occurrences': 1 }) def _check_missing_elements(self, coded_chain): """Prüft auf fehlende Elemente""" if not coded_chain: return # Fehlende Begrüßung? first = coded_chain[0] if first not in ['00000', '10000']: # weder KBG noch VBG self.missing_elements['greeting'] += 1 # Fehlende Verabschiedung? last = coded_chain[-1] if last not in ['01100', '11100']: # weder KAV noch VAV self.missing_elements['farewell'] += 1 # Ungewöhnliche Phasenfolgen phases = [code[1:3] for code in coded_chain] for i in range(len(phases) - 1): curr = phases[i] next_phase = phases[i + 1] # Prüfe auf Phasenrücksprünge (z.B. A → B) phase_order = {'00': 0, '01': 1, '10': 2, '11': 3} if curr in phase_order and next_phase in phase_order: if phase_order[next_phase] < phase_order[curr]: self.missing_elements['phase_regression'] += 1 def get_terminal_probabilities(self): """Berechnet Übergangswahrscheinlichkeiten auf Terminalebene""" probs = {} for curr, targets in self.terminal_transitions.items(): total = self.terminal_counts[curr] if total > 0: probs[curr] = { next_sym: count / total for next_sym, count in targets.items() } return probs def get_phase_probabilities(self): """Berechnet Übergangswahrscheinlichkeiten auf Phasenebene""" probs = {} for curr, targets in self.phase_transitions.items(): total = self.phase_counts[curr] if total > 0: probs[curr] = { next_phase: count / total for next_phase, count in targets.items() } return probs def get_loop_statistics(self): """Gibt Statistiken über Schleifen zurück""" return sorted(self.loops, key=lambda x: -x['occurrences']) def get_statistics(self): """Gibt alle statistischen Kennzahlen zurück""" return { 'terminal_probabilities': self.get_terminal_probabilities(), 'phase_probabilities': self.get_phase_probabilities(), 'loops': self.get_loop_statistics(), 'missing_elements': dict(self.missing_elements), 'total_sequences': len(self.terminal_counts), 'transcript_results': self.transcript_results } def reset(self): """Setzt alle statistischen Daten zurück""" self.terminal_transitions = defaultdict(Counter) self.phase_transitions = defaultdict(Counter) self.terminal_counts = defaultdict(int) self.phase_counts = defaultdict(int) self.loops = [] self.missing_elements = defaultdict(int) self.transcript_results = [] def print_report(self): """Gibt einen statistischen Bericht aus""" lines = [] lines.append("\n" + "=" * 70) lines.append("STATISTISCHE ANALYSE") lines.append("=" * 70) # 1. Validierungsergebnisse lines.append("\n1. VALIDIERUNGSERGEBNISSE:") valid_count = sum(1 for r in self.transcript_results if r['valid']) total = len(self.transcript_results) lines.append(f" Gültige Ketten: {valid_count}/{total} ({valid_count/total*100:.1f}%)") for result in self.transcript_results: status = "✓" if result['valid'] else "✗" lines.append(f" Transkript {result['transcript_id']}: {status} " f"(Länge: {result['length']}, Endzustand: {result['state']})") # 2. Fehlende Elemente lines.append("\n2. FEHLENDE ELEMENTE:") if self.missing_elements: for elem, count in self.missing_elements.items(): if elem == 'greeting': lines.append(f" Fehlende Begrüßung: {count}x") elif elem == 'farewell': lines.append(f" Fehlende Verabschiedung: {count}x") elif elem == 'phase_regression': lines.append(f" Phasenrücksprünge: {count}x") else: lines.append(" Keine fehlenden Elemente") # 3. Schleifen lines.append("\n3. ERKANNTE SCHLEIFEN:") loops = self.get_loop_statistics() if loops: for i, loop in enumerate(loops[:5]): # Top 5 anzeigen pattern_str = ' → '.join([TerminalCoding.decode(c) for c in loop['pattern']]) codes_str = ','.join(loop['pattern']) lines.append(f" {i+1}. Muster: {pattern_str}") lines.append(f" Codes: {codes_str}") lines.append(f" Länge: {loop['length']}, Vorkommen: {loop['occurrences']}x") lines.append(f" in Transkripten: {', '.join(map(str, loop['transcripts']))}") else: lines.append(" Keine Schleifen erkannt") # 4. Phasen-Übergangswahrscheinlichkeiten lines.append("\n4. PHASEN-ÜBERGANGSWAHRSCHEINLICHKEITEN:") phase_probs = self.get_phase_probabilities() phase_names = {'00': 'BG (Begrüßung)', '01': 'B (Bedarf)', '10': 'A (Abschluss)', '11': 'AV (Verabschiedung)'} for curr, targets in phase_probs.items(): curr_name = phase_names.get(curr, curr) transitions = [] for next_phase, prob in sorted(targets.items(), key=lambda x: -x[1]): next_name = phase_names.get(next_phase, next_phase) transitions.append(f"{next_name}: {prob:.3f}") lines.append(f" {curr_name} → {', '.join(transitions)}") # 5. Terminal-Übergangswahrscheinlichkeiten (Top 10) lines.append("\n5. TERMINAL-ÜBERGANGSWAHRSCHEINLICHKEITEN (Top 10):") term_probs = self.get_terminal_probabilities() all_transitions = [] for curr, targets in term_probs.items(): for next_sym, prob in targets.items(): all_transitions.append((curr, next_sym, prob)) all_transitions.sort(key=lambda x: -x[2]) for i, (curr, next_sym, prob) in enumerate(all_transitions[:10]): curr_sym = TerminalCoding.decode(curr) next_sym_dec = TerminalCoding.decode(next_sym) lines.append(f" {i+1}. {curr_sym} ({curr}) → {next_sym_dec} ({next_sym}): {prob:.3f}") return "\n".join(lines) def export_json(self, filename="statistik_export.json"): """Exportiert die Statistiken als JSON""" stats = self.get_statistics() # Konvertiere nicht-serialisierbare Objekte stats['loops'] = [ { 'position': l['position'], 'length': l['length'], 'pattern': list(l['pattern']), 'transcripts': l['transcripts'], 'occurrences': l['occurrences'] } for l in stats['loops'] ] stats['missing_elements'] = dict(stats['missing_elements']) with open(filename, 'w', encoding='utf-8') as f: json.dump(stats, f, indent=2, ensure_ascii=False) return filename # ============================================================================ # ARS 2.0 - BASIS-GRAMMATIK # ============================================================================ class ARS20: """ARS 2.0 - Übergangswahrscheinlichkeiten ohne Nonterminale""" def __init__(self): self.chains = [] self.terminals = [] self.start_symbol = None self.transitions = {} self.probabilities = {} self.optimized_probabilities = {} self.history = [] def load_chains(self, chains, start_symbol=None): self.chains = chains all_terminals = set() for chain in chains: for symbol in chain: all_terminals.add(symbol) self.terminals = sorted(list(all_terminals)) self.start_symbol = start_symbol if start_symbol else (chains[0][0] if chains else None) self.transitions = self.count_transitions(chains) self.probabilities = self.calculate_probabilities(self.transitions) return True def count_transitions(self, chains): transitions = {} for chain in chains: for i in range(len(chain) - 1): start, end = chain[i], chain[i + 1] if start not in transitions: transitions[start] = {} if end not in transitions[start]: transitions[start][end] = 0 transitions[start][end] += 1 return transitions def calculate_probabilities(self, transitions): probabilities = {} for start in transitions: total = sum(transitions[start].values()) if total > 0: probabilities[start] = {end: count / total for end, count in transitions[start].items()} return probabilities def print_grammar(self): lines = [] lines.append("=" * 70) lines.append("ARS 2.0 - ÜBERGANGSWAHRSCHEINLICHKEITEN") lines.append("=" * 70) lines.append("") if self.probabilities: for start in sorted(self.probabilities.keys()): trans = self.probabilities[start] trans_str = ", ".join([f"{end}: {prob:.3f}" for end, prob in sorted(trans.items())]) lines.append(f"{start} -> {trans_str}") else: lines.append("Keine Übergänge gefunden.") lines.append(f"\nTerminalzeichen ({len(self.terminals)}): {self.terminals}") lines.append(f"Startzeichen: {self.start_symbol}") return "\n".join(lines) def generate_chain(self, start_symbol=None, max_length=20): if not self.optimized_probabilities: probs = self.probabilities else: probs = self.optimized_probabilities start = start_symbol if start_symbol else self.start_symbol if not start or start not in probs: return [] chain = [start] current = start for _ in range(max_length - 1): if current not in probs: break next_symbols = list(probs[current].keys()) if not next_symbols: break probs_list = list(probs[current].values()) if not probs_list: break try: next_symbol = np.random.choice(next_symbols, p=probs_list) chain.append(next_symbol) current = next_symbol except: break if current not in probs: break return chain def optimize(self, max_iterations=500, tolerance=0.005, target_correlation=0.9, progress_callback=None): probs = {} for start, p in self.probabilities.items(): probs[start] = p.copy() empirical_freqs = self.compute_frequencies(self.chains) best_correlation = 0 best_probabilities = None history = [] for iteration in range(max_iterations): generated = [self.generate_chain(max_length=20) for _ in range(8)] generated = [g for g in generated if g] if not generated: continue gen_freqs = self.compute_frequencies(generated) try: if len(empirical_freqs) == len(gen_freqs) and len(empirical_freqs) > 1: corr, p_val = pearsonr(empirical_freqs, gen_freqs) else: corr, p_val = 0, 1 except: corr, p_val = 0, 1 history.append((iteration, corr, p_val)) if progress_callback and iteration % 10 == 0: progress_callback(iteration, max_iterations, corr, p_val) if corr >= target_correlation and p_val < 0.05: best_correlation = corr best_probabilities = {s: p.copy() for s, p in probs.items()} break for start in probs: for end in list(probs[start].keys()): if end in self.terminals: idx = self.terminals.index(end) if idx < len(empirical_freqs) and idx < len(gen_freqs): emp_prob = empirical_freqs[idx] gen_prob = gen_freqs[idx] error = emp_prob - gen_prob probs[start][end] += error * tolerance probs[start][end] = max(0.01, min(0.99, probs[start][end])) for start in probs: total = sum(probs[start].values()) if total > 0: probs[start] = {end: p/total for end, p in probs[start].items()} if best_probabilities is None and history: best_idx = max(range(len(history)), key=lambda i: history[i][1]) best_correlation = history[best_idx][1] best_probabilities = self.probabilities self.optimized_probabilities = best_probabilities self.history = history return best_probabilities, best_correlation, history def compute_frequencies(self, chains): if not self.terminals: return np.array([]) freq_array = np.zeros(len(self.terminals)) term_index = {term: i for i, term in enumerate(self.terminals)} for chain in chains: for symbol in chain: if symbol in term_index: freq_array[term_index[symbol]] += 1 total = freq_array.sum() if total > 0: freq_array /= total return freq_array # ============================================================================ # ARS 3.0 - GRAMMATIK MIT NONTERMINALEN (gekürzt, da nicht im Fokus) # ============================================================================ class MethodologicalReflection: def __init__(self): self.interpretation_log = [] self.sequence_meaning_mapping = {} def log_interpretation(self, sequence, new_nonterminal, rationale): self.interpretation_log.append({ 'sequence': sequence, 'new_nonterminal': new_nonterminal, 'rationale': rationale, 'timestamp': len(self.interpretation_log) }) aktionen = [self._interpretiere_symbol(s) for s in sequence if isinstance(s, str)] self.sequence_meaning_mapping[tuple(sequence)] = { 'bedeutung': ' → '.join(aktionen), 'typ': self._klassifiziere_sequenz(sequence) } def _interpretiere_symbol(self, symbol): bedeutungen = { 'KBG': 'Kunden-Gruß', 'VBG': 'Verkäufer-Gruß', 'KBBd': 'Kunden-Bedarf (konkret)', 'VBBd': 'Verkäufer-Nachfrage', 'KBA': 'Kunden-Antwort', 'VBA': 'Verkäufer-Reaktion', 'KAE': 'Kunden-Erkundigung', 'VAE': 'Verkäufer-Auskunft', 'KAA': 'Kunden-Abschluss', 'VAA': 'Verkäufer-Abschluss', 'KAV': 'Kunden-Verabschiedung', 'VAV': 'Verkäufer-Verabschiedung' } return bedeutungen.get(symbol, str(symbol)) def _klassifiziere_sequenz(self, sequence): seq_str = ' '.join([str(s) for s in sequence]) if 'KBBd' in seq_str and 'VBBd' in seq_str: return 'Bedarfsaushandlung' elif 'KAE' in seq_str or 'VAE' in seq_str: return 'Informationsaustausch' elif 'KAA' in seq_str and 'VAA' in seq_str: return 'Transaktionsabschluss' else: return 'Interaktionssequenz' def print_summary(self): print("\n" + "=" * 70) print("METHODOLOGISCHE REFLEXION") print("=" * 70) print("\nDokumentierte Interpretationsentscheidungen:") for log in self.interpretation_log: print(f"\n[Interpretation {log['timestamp']+1}]") seq_str = ' → '.join([str(s) for s in log['sequence']]) print(f" Sequenz: {seq_str}") print(f" → Nonterminal: {log['new_nonterminal']}") print(f" Begründung: {log['rationale']}") if tuple(log['sequence']) in self.sequence_meaning_mapping: mapping = self.sequence_meaning_mapping[tuple(log['sequence'])] print(f" Bedeutung: {mapping['bedeutung']}") print(f" Sequenztyp: {mapping['typ']}") class GrammarInducer: def __init__(self): self.rules = {} self.terminals = set() self.nonterminals = set() self.start_symbol = None self.user_start_symbol = None self.compression_history = [] self.reflection = MethodologicalReflection() self.chains = [] self.iteration_count = 0 self.hierarchy_levels = {} def load_chains(self, chains, user_start_symbol=None): self.chains = [list(chain) for chain in chains] self.user_start_symbol = user_start_symbol all_symbols = set() for chain in chains: for symbol in chain: all_symbols.add(symbol) self.terminals = all_symbols return True def find_best_repetition(self, chains, min_length=2, max_length=5): sequence_counter = Counter() for chain in chains: max_len = min(max_length, len(chain)) for length in range(min_length, max_len + 1): for i in range(len(chain) - length + 1): seq = tuple(chain[i:i+length]) sequence_counter[seq] += 1 repeated = {seq: count for seq, count in sequence_counter.items() if count >= 2} if not repeated: return None best_seq = max(repeated.items(), key=lambda x: x[1] * len(x[0]) / max(1, len(set(x[0])))) return best_seq[0] def generate_nonterminal_name(self, sequence): if all(isinstance(s, str) and s.startswith(('K', 'V')) for s in sequence): first = sequence[0] last = sequence[-1] seq_str = ' '.join([str(s) for s in sequence]) if 'KBBd' in seq_str and 'VBBd' in seq_str: typ = "BEDARFSKLAERUNG" elif ('VAA' in seq_str and 'KAA' in seq_str) or ('VAA' in seq_str and 'KAV' in seq_str): typ = "ZAHLUNGSVORGANG" elif 'KAE' in seq_str or 'VAE' in seq_str: typ = "INFORMATIONSAUSTAUSCH" elif 'KBG' in seq_str and 'VBG' in seq_str: typ = "BEGRUESSUNG" elif 'VAV' in seq_str and 'KAV' in seq_str: typ = "VERABSCHIEDUNG" else: typ = "SEQUENZ" return f"NT_{typ}_{first}_{last}" else: return f"NT_{'_'.join(str(s) for s in sequence)}" def _describe_sequence(self, sequence): if len(sequence) == 2: if all(isinstance(s, str) and len(s) <= 4 for s in sequence): return f"{self.reflection._interpretiere_symbol(sequence[0])} → {self.reflection._interpretiere_symbol(sequence[1])}" else: return f"{sequence[0]} → {sequence[1]}" else: return f"Sequenz mit {len(sequence)} Schritten" def compress_sequences(self, chains, sequence, new_nonterminal): compressed = [] seq_tuple = tuple(sequence) seq_len = len(sequence) for chain in chains: new_chain = [] i = 0 while i < len(chain): if i <= len(chain) - seq_len and tuple(chain[i:i+seq_len]) == seq_tuple: new_chain.append(new_nonterminal) i += seq_len else: new_chain.append(chain[i]) i += 1 compressed.append(new_chain) return compressed def all_chains_identical(self, chains): if not chains: return False first = chains[0] return all(len(chain) == 1 and chain[0] == first[0] for chain in chains) def find_top_level_nonterminal(self): if not self.rules: return None symbols_in_productions = set() for nt, productions in self.rules.items(): for prod, _ in productions: for sym in prod: symbols_in_productions.add(sym) top_level = [nt for nt in self.rules if nt not in symbols_in_productions] if top_level: if len(top_level) > 1: top_level.sort(key=lambda nt: self.hierarchy_levels.get(nt, 0), reverse=True) return top_level[0] if self.hierarchy_levels: return max(self.hierarchy_levels.items(), key=lambda x: x[1])[0] return list(self.rules.keys())[0] if self.rules else None def induce_grammar(self, max_iterations=50, progress_callback=None): current_chains = [list(chain) for chain in self.chains] iteration = 0 rule_counter = 1 self.rules = {} self.nonterminals = set() self.compression_history = [] self.iteration_count = 0 self.hierarchy_levels = {} print("\n" + "=" * 70) print("HIERARCHISCHE GRAMMATIKINDUKTION") print("=" * 70) while iteration < max_iterations: best_seq = self.find_best_repetition(current_chains) if best_seq is None: print(f"\nKeine weiteren Wiederholungen nach {iteration} Iterationen.") break new_nonterminal = self.generate_nonterminal_name(best_seq) beschreibung = self._describe_sequence(best_seq) base_name = new_nonterminal while new_nonterminal in self.nonterminals: new_nonterminal = f"{base_name}_{rule_counter}" rule_counter += 1 rationale = f"Erkanntes Dialogmuster: {beschreibung}" self.reflection.log_interpretation(best_seq, new_nonterminal, rationale) seq_str = ' → '.join([str(s) for s in best_seq]) print(f"\nIteration {iteration + 1}:") print(f" Erkanntes Muster: {seq_str}") print(f" → Neue Kategorie: {new_nonterminal}") self.rules[new_nonterminal] = [(list(best_seq), 1.0)] self.nonterminals.add(new_nonterminal) self.hierarchy_levels[new_nonterminal] = iteration occurrences = 0 for chain in current_chains: for i in range(len(chain) - len(best_seq) + 1): if tuple(chain[i:i+len(best_seq)]) == best_seq: occurrences += 1 self.compression_history.append({ 'iteration': iteration, 'sequence': best_seq, 'new_symbol': new_nonterminal, 'occurrences': occurrences }) current_chains = self.compress_sequences(current_chains, best_seq, new_nonterminal) if current_chains and current_chains[0]: example = ' → '.join([str(s) for s in current_chains[0][:10]]) print(f" Beispiel: {example}...") iteration += 1 self.iteration_count = iteration if self.all_chains_identical(current_chains): if current_chains and current_chains[0]: unique_symbol = current_chains[0][0] if self.user_start_symbol and self.user_start_symbol in self.rules: self.start_symbol = self.user_start_symbol elif unique_symbol in self.rules: self.start_symbol = unique_symbol else: self.start_symbol = self.find_top_level_nonterminal() break if self.start_symbol is None: if self.user_start_symbol and self.user_start_symbol in self.rules: self.start_symbol = self.user_start_symbol elif self.rules: self.start_symbol = self.find_top_level_nonterminal() all_symbols = set() for chain in self.chains: for sym in chain: all_symbols.add(sym) self.terminals = all_symbols - self.nonterminals self._calculate_probabilities() return current_chains def _calculate_probabilities(self): expansion_counts = defaultdict(Counter) for chain in self.chains: self._count_expansions(chain, expansion_counts) for nonterminal in self.rules: if nonterminal in expansion_counts: total = sum(expansion_counts[nonterminal].values()) if total > 0: productions = [] for expansion, count in expansion_counts[nonterminal].items(): productions.append((list(expansion), count / total)) productions.sort(key=lambda x: x[1], reverse=True) self.rules[nonterminal] = productions def _count_expansions(self, sequence, expansion_counts): i = 0 while i < len(sequence): symbol = sequence[i] if symbol in self.rules: found = False for expansion, _ in self.rules[symbol]: exp_len = len(expansion) if i + exp_len <= len(sequence) and sequence[i:i+exp_len] == expansion: expansion_counts[symbol][tuple(expansion)] += 1 self._count_expansions(expansion, expansion_counts) i += exp_len found = True break if not found: i += 1 else: i += 1 def print_grammar(self): lines = [] lines.append("\n" + "=" * 70) lines.append("INDUZIERTE GRAMMATIK") lines.append("=" * 70) lines.append(f"\nTerminale ({len(self.terminals)}): {sorted(self.terminals)}") lines.append(f"Nonterminale ({len(self.nonterminals)}): {sorted(self.nonterminals)}") lines.append(f"Startsymbol: {self.start_symbol}") lines.append(f"Iterationen: {self.iteration_count}") lines.append("\nPRODUKTIONSREGELN (mit Wahrscheinlichkeiten):") for nonterminal in sorted(self.rules.keys()): productions = self.rules[nonterminal] if productions: prod_str = " | ".join([f"{' → '.join(prod)} [{prob:.3f}]" for prod, prob in productions]) lines.append(f"\n{nonterminal} → {prod_str}") return "\n".join(lines) def generate_chain(self, start_symbol=None, max_depth=20): if not start_symbol: start_symbol = self.start_symbol if not start_symbol: return [] if start_symbol not in self.rules: if self.rules: start_symbol = self.find_top_level_nonterminal() else: return [] prod_probs = {} for nt, prods in self.rules.items(): symbols = [p for p, _ in prods] probs = [prob for _, prob in prods] if symbols and probs: total = sum(probs) if total > 0: probs = [p/total for p in probs] prod_probs[nt] = (symbols, probs) def expand(symbol, depth=0): if depth >= max_depth: return [str(symbol)] if symbol in self.terminals: return [str(symbol)] if symbol not in prod_probs: return [str(symbol)] symbols, probs = prod_probs[symbol] if not symbols: return [str(symbol)] try: chosen_idx = np.random.choice(len(symbols), p=probs) chosen = symbols[chosen_idx] except Exception: chosen = symbols[0] if symbols else [] result = [] for sym in chosen: result.extend(expand(sym, depth + 1)) return result return expand(start_symbol) # ============================================================================ # PETRI-NETZE (gekürzt, da nicht im Fokus) # ============================================================================ if NETWORKX_AVAILABLE: class ARSPetriNet: def __init__(self, name="ARS_PetriNet"): self.name = name self.places = {} self.transitions = {} self.arcs = [] self.tokens = {} self.hierarchy = {} self.firing_history = [] self.reached_markings = set() def add_place(self, name, initial_tokens=0, place_type="normal"): self.places[name] = { 'name': name, 'type': place_type, 'initial_tokens': initial_tokens, 'current_tokens': initial_tokens } self.tokens[name] = initial_tokens def add_transition(self, name, transition_type="speech_act", guard=None, subnet=None): self.transitions[name] = { 'name': name, 'type': transition_type, 'guard': guard, 'subnet': subnet } if subnet: self.hierarchy[name] = subnet def add_arc(self, source, target, weight=1): self.arcs.append({'source': source, 'target': target, 'weight': weight}) def get_preset(self, transition): preset = {} for arc in self.arcs: if arc['target'] == transition and arc['source'] in self.places: preset[arc['source']] = arc['weight'] return preset def get_postset(self, transition): postset = {} for arc in self.arcs: if arc['source'] == transition and arc['target'] in self.places: postset[arc['target']] = arc['weight'] return postset def is_enabled(self, transition): if transition not in self.transitions: return False preset = self.get_preset(transition) for place, weight in preset.items(): if self.tokens.get(place, 0) < weight: return False trans_data = self.transitions[transition] if trans_data['guard'] and not trans_data['guard'](self): return False return True def fire(self, transition): if not self.is_enabled(transition): return False preset = self.get_preset(transition) for place, weight in preset.items(): self.tokens[place] -= weight postset = self.get_postset(transition) for place, weight in postset.items(): self.tokens[place] = self.tokens.get(place, 0) + weight self.firing_history.append({'transition': transition, 'marking': self.get_marking_copy()}) self.reached_markings.add(self.get_marking_tuple()) return True def get_marking_copy(self): return self.tokens.copy() def get_marking_tuple(self): return tuple(sorted([(p, self.tokens[p]) for p in self.places])) def reset(self): for place_name, place_data in self.places.items(): self.tokens[place_name] = place_data['initial_tokens'] self.firing_history = [] def simulate(self, transition_sequence): self.reset() successful = [] for t in transition_sequence: if self.is_enabled(t): self.fire(t) successful.append(t) else: break return successful, self.get_marking_copy() class PetriNetBuilder: def __init__(self, terminal_chains, grammar_rules=None): self.chains = terminal_chains self.grammar = grammar_rules self.petri_net = None def build_basic_net(self): self.petri_net = ARSPetriNet("ARS_PetriNet_Basic") all_symbols = set() for chain in self.chains: for sym in chain: all_symbols.add(sym) self.petri_net.add_place("p_start", initial_tokens=1) self.petri_net.add_place("p_end", initial_tokens=0) for i, sym in enumerate(sorted(all_symbols)): self.petri_net.add_place(f"p_{sym}_ready", initial_tokens=0) self.petri_net.add_transition(f"t_{sym}") if i == 0: self.petri_net.add_arc("p_start", f"t_{sym}") self.petri_net.add_arc(f"t_{sym}", f"p_{sym}_ready") return self.petri_net def build_resource_net(self): self.petri_net = ARSPetriNet("ARS_PetriNet_Resource") self.petri_net.add_place("p_customer_present", initial_tokens=1, place_type="customer") self.petri_net.add_place("p_customer_ready", initial_tokens=1, place_type="customer") self.petri_net.add_place("p_seller_ready", initial_tokens=1, place_type="seller") self.petri_net.add_place("p_goods_available", initial_tokens=10, place_type="resource") self.petri_net.add_place("p_goods_selected", initial_tokens=0, place_type="resource") self.petri_net.add_place("p_money_customer", initial_tokens=20, place_type="resource") self.petri_net.add_place("p_money_register", initial_tokens=0, place_type="resource") phases = ["Greeting", "Need", "Consult", "Completion", "Farewell"] for phase in phases: self.petri_net.add_place(f"p_phase_{phase}", initial_tokens=0, place_type="phase") self.petri_net.add_place("p_phase_start", initial_tokens=1, place_type="phase") all_symbols = set() for chain in self.chains: for sym in chain: all_symbols.add(sym) for sym in sorted(all_symbols): self.petri_net.add_transition(f"t_{sym}") if sym.startswith('K'): self.petri_net.add_arc("p_customer_ready", f"t_{sym}") self.petri_net.add_arc(f"t_{sym}", "p_customer_ready") else: self.petri_net.add_arc("p_seller_ready", f"t_{sym}") self.petri_net.add_arc(f"t_{sym}", "p_seller_ready") if sym.endswith('A'): self.petri_net.add_arc("p_goods_selected", f"t_{sym}") self.petri_net.add_arc("p_money_customer", f"t_{sym}") self.petri_net.add_arc(f"t_{sym}", "p_goods_available") self.petri_net.add_arc(f"t_{sym}", "p_money_register") return self.petri_net def simulate_chain(self, chain): if not self.petri_net: self.build_basic_net() self.petri_net.reset() results = [] for sym in chain: trans_name = f"t_{sym}" if trans_name in self.petri_net.transitions: enabled = self.petri_net.is_enabled(trans_name) if enabled: self.petri_net.fire(trans_name) results.append((sym, True, "enabled")) else: results.append((sym, False, "not enabled")) else: results.append((sym, False, "no transition")) return results, self.petri_net.get_marking_copy() else: class ARSPetriNet: def __init__(self, *args, **kwargs): raise ImportError("networkx nicht installiert") class PetriNetBuilder: def __init__(self, *args, **kwargs): raise ImportError("networkx nicht installiert") # ============================================================================ # GUI - HAUPTFENSTER # ============================================================================ class ARSGUI: def __init__(self, root): self.root = root self.root.title("ARS 4.0 - Algorithmic Recursive Sequence Analysis") self.root.geometry("1400x900") self.plot_thread = PlotThread(root) self.update_queue = queue.Queue() self.process_updates() # Daten self.chains = [] self.terminals = [] self.delimiter = tk.StringVar(value=",") self.start_symbol = tk.StringVar(value="") # Kodierte Ketten self.coded_chains = [] # ARS-Objekte self.ars20 = ARS20() self.ars30 = GrammarInducer() self.petri_builder = None self.automaton = DialogueAutomaton() self.stats_extension = StatisticalExtension(self.automaton) # Verfügbarkeit der optionalen Module self.module_status = { 'networkx': NETWORKX_AVAILABLE, 'hmmlearn': HMM_AVAILABLE, 'crf': CRF_AVAILABLE, 'transformer': TRANSFORMER_AVAILABLE, 'seaborn': SEABORN_AVAILABLE } self.create_menu() self.create_main_panels() self.status_var = tk.StringVar(value="Bereit") self.create_statusbar() self.show_module_status() def process_updates(self): try: while True: update_func = self.update_queue.get_nowait() update_func() except queue.Empty: pass finally: self.root.after(100, self.process_updates) def safe_gui_update(self, func): self.update_queue.put(func) def create_menu(self): menubar = tk.Menu(self.root) self.root.config(menu=menubar) file_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Datei", menu=file_menu) file_menu.add_command(label="Öffnen", command=self.load_file) file_menu.add_command(label="Beispiel laden", command=self.load_example) file_menu.add_separator() file_menu.add_command(label="Beenden", command=self.root.quit) help_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Hilfe", menu=help_menu) help_menu.add_command(label="Modulstatus", command=self.show_module_status) help_menu.add_command(label="Über", command=self.show_about) def create_main_panels(self): main_paned = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL) main_paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) left_frame = ttk.Frame(main_paned) main_paned.add(left_frame, weight=1) self.create_input_panel(left_frame) right_frame = ttk.Frame(main_paned) main_paned.add(right_frame, weight=3) self.create_output_panel(right_frame) def create_input_panel(self, parent): ttk.Label(parent, text="Eingabe", font=('Arial', 12, 'bold')).pack(anchor=tk.W, pady=5) delim_frame = ttk.Frame(parent) delim_frame.pack(fill=tk.X, pady=5) ttk.Label(delim_frame, text="Trennzeichen:").pack(side=tk.LEFT) ttk.Radiobutton(delim_frame, text="Komma (,)", variable=self.delimiter, value=",").pack(side=tk.LEFT, padx=2) ttk.Radiobutton(delim_frame, text="Semikolon (;)", variable=self.delimiter, value=";").pack(side=tk.LEFT, padx=2) ttk.Radiobutton(delim_frame, text="Leerzeichen", variable=self.delimiter, value=" ").pack(side=tk.LEFT, padx=2) ttk.Radiobutton(delim_frame, text="Benutzer", variable=self.delimiter, value="custom").pack(side=tk.LEFT, padx=2) self.custom_delimiter = ttk.Entry(delim_frame, width=5) self.custom_delimiter.pack(side=tk.LEFT, padx=2) self.custom_delimiter.insert(0, "|") ttk.Label(parent, text="Terminalzeichenketten (eine pro Zeile):").pack(anchor=tk.W, pady=5) self.text_input = scrolledtext.ScrolledText(parent, height=12, font=('Courier', 10)) self.text_input.pack(fill=tk.BOTH, expand=True, pady=5) btn_frame = ttk.Frame(parent) btn_frame.pack(fill=tk.X, pady=5) ttk.Button(btn_frame, text="Datei laden", command=self.load_file).pack(side=tk.LEFT, padx=2) ttk.Button(btn_frame, text="Parsen", command=self.parse_input).pack(side=tk.LEFT, padx=2) ttk.Button(btn_frame, text="Beispiel", command=self.load_example).pack(side=tk.LEFT, padx=2) start_frame = ttk.Frame(parent) start_frame.pack(fill=tk.X, pady=5) ttk.Label(start_frame, text="Startzeichen:").pack(side=tk.LEFT) self.start_entry = ttk.Entry(start_frame, textvariable=self.start_symbol, width=10) self.start_entry.pack(side=tk.LEFT, padx=5) self.info_var = tk.StringVar(value="Keine Daten geladen") ttk.Label(parent, textvariable=self.info_var, foreground="blue").pack(anchor=tk.W, pady=5) def create_output_panel(self, parent): self.notebook = ttk.Notebook(parent) self.notebook.pack(fill=tk.BOTH, expand=True) self.tab20 = ttk.Frame(self.notebook) self.notebook.add(self.tab20, text="ARS 2.0 (Basis)") self.create_ars20_tab() self.tab30 = ttk.Frame(self.notebook) self.notebook.add(self.tab30, text="ARS 3.0 (Nonterminale)") self.create_ars30_tab() self.tab_code = ttk.Frame(self.notebook) self.notebook.add(self.tab_code, text="5-Bit-Kodierung") self.create_code_tab() self.tab_auto = ttk.Frame(self.notebook) self.notebook.add(self.tab_auto, text="Entscheidungsautomat") self.create_automaton_tab() self.tab_stats = ttk.Frame(self.notebook) self.notebook.add(self.tab_stats, text="Statistische Analyse") self.create_statistics_tab() self.tab_petri = ttk.Frame(self.notebook) self.notebook.add(self.tab_petri, text="Petri-Netze") self.create_petri_tab() self.tab_gen = ttk.Frame(self.notebook) self.notebook.add(self.tab_gen, text="Generierung") self.create_generation_tab() def create_ars20_tab(self): control = ttk.Frame(self.tab20) control.pack(fill=tk.X, pady=5) ttk.Button(control, text="ARS 2.0 berechnen", command=self.run_ars20).pack(side=tk.LEFT, padx=5) self.text20 = scrolledtext.ScrolledText(self.tab20, font=('Courier', 10)) self.text20.pack(fill=tk.BOTH, expand=True, pady=5) def create_ars30_tab(self): control = ttk.Frame(self.tab30) control.pack(fill=tk.X, pady=5) ttk.Button(control, text="Grammatik induzieren", command=self.run_ars30).pack(side=tk.LEFT, padx=5) self.ars30_progress = ttk.Progressbar(control, length=200, mode='indeterminate') self.ars30_progress.pack(side=tk.LEFT, padx=10) self.text30 = scrolledtext.ScrolledText(self.tab30, font=('Courier', 10)) self.text30.pack(fill=tk.BOTH, expand=True, pady=5) def create_code_tab(self): control = ttk.Frame(self.tab_code) control.pack(fill=tk.X, pady=5) ttk.Button(control, text="Kodieren", command=self.encode_chains).pack(side=tk.LEFT, padx=5) ttk.Button(control, text="Dekodieren", command=self.decode_chains).pack(side=tk.LEFT, padx=5) self.text_code = scrolledtext.ScrolledText(self.tab_code, font=('Courier', 10)) self.text_code.pack(fill=tk.BOTH, expand=True, pady=5) def create_automaton_tab(self): control = ttk.Frame(self.tab_auto) control.pack(fill=tk.X, pady=5) ttk.Button(control, text="Validiere Transkript 1", command=self.validate_transcript_1).pack(side=tk.LEFT, padx=5) ttk.Button(control, text="Validiere alle", command=self.validate_all).pack(side=tk.LEFT, padx=5) ttk.Button(control, text="Automaten zurücksetzen", command=self.reset_automaton).pack(side=tk.LEFT, padx=5) self.text_auto = scrolledtext.ScrolledText(self.tab_auto, font=('Courier', 10)) self.text_auto.pack(fill=tk.BOTH, expand=True, pady=5) def create_statistics_tab(self): """Neuer Tab für statistische Analyse""" control = ttk.Frame(self.tab_stats) control.pack(fill=tk.X, pady=5) ttk.Button(control, text="Statistische Analyse starten", command=self.run_statistical_analysis).pack(side=tk.LEFT, padx=5) ttk.Button(control, text="Bericht exportieren (JSON)", command=self.export_statistics).pack(side=tk.LEFT, padx=5) ttk.Button(control, text="Statistik zurücksetzen", command=self.reset_statistics).pack(side=tk.LEFT, padx=5) self.text_stats = scrolledtext.ScrolledText(self.tab_stats, font=('Courier', 10)) self.text_stats.pack(fill=tk.BOTH, expand=True, pady=5) def create_petri_tab(self): control = ttk.Frame(self.tab_petri) control.pack(fill=tk.X, pady=5) if self.module_status['networkx']: ttk.Button(control, text="Einfaches Netz", command=self.build_basic_petri).pack(side=tk.LEFT, padx=5) ttk.Button(control, text="Netz mit Ressourcen", command=self.build_resource_petri).pack(side=tk.LEFT, padx=5) ttk.Button(control, text="Simuliere Transkript 1", command=self.simulate_petri).pack(side=tk.LEFT, padx=5) else: ttk.Label(control, text="networkx nicht verfügbar", foreground="red").pack(side=tk.LEFT, padx=5) self.text_petri = scrolledtext.ScrolledText(self.tab_petri, font=('Courier', 10)) self.text_petri.pack(fill=tk.BOTH, expand=True, pady=5) def create_generation_tab(self): control = ttk.Frame(self.tab_gen) control.pack(fill=tk.X, pady=5) ttk.Label(control, text="Grammatik:").pack(side=tk.LEFT) self.gen_source = tk.StringVar(value="ars20") ttk.Radiobutton(control, text="ARS 2.0", variable=self.gen_source, value="ars20").pack(side=tk.LEFT, padx=5) ttk.Radiobutton(control, text="ARS 3.0", variable=self.gen_source, value="ars30").pack(side=tk.LEFT, padx=5) ttk.Label(control, text="Anzahl:").pack(side=tk.LEFT, padx=(20,5)) self.gen_count = ttk.Spinbox(control, from_=1, to=50, width=5) self.gen_count.set(5) self.gen_count.pack(side=tk.LEFT) ttk.Button(control, text="Generieren", command=self.generate_chains).pack(side=tk.LEFT, padx=20) self.text_gen = scrolledtext.ScrolledText(self.tab_gen, font=('Courier', 10)) self.text_gen.pack(fill=tk.BOTH, expand=True, pady=5) def create_statusbar(self): status = ttk.Frame(self.root) status.pack(side=tk.BOTTOM, fill=tk.X) ttk.Label(status, textvariable=self.status_var).pack(side=tk.LEFT, padx=5) self.progress_bar = ttk.Progressbar(status, length=100, mode='indeterminate') self.progress_bar.pack(side=tk.RIGHT, padx=5) def show_module_status(self): status_text = "Modulstatus:\n" status_text += f"✓ networkx: {'verfügbar' if self.module_status['networkx'] else 'nicht verfügbar'}\n" status_text += f"✓ hmmlearn: {'verfügbar' if self.module_status['hmmlearn'] else 'nicht verfügbar'}\n" status_text += f"✓ sklearn-crfsuite: {'verfügbar' if self.module_status['crf'] else 'nicht verfügbar'}\n" status_text += f"✓ sentence-transformers: {'verfügbar' if self.module_status['transformer'] else 'nicht verfügbar'}\n" status_text += f"✓ seaborn: {'verfügbar' if self.module_status['seaborn'] else 'nicht verfügbar'}" messagebox.showinfo("Modulstatus", status_text) def get_actual_delimiter(self): delim = self.delimiter.get() if delim == "custom": return self.custom_delimiter.get() return delim def parse_line(self, line): line = line.strip() if not line: return [] delim = self.get_actual_delimiter() if delim == " ": parts = re.split(r'\s+', line) else: parts = line.split(delim) return [p.strip() for p in parts if p.strip()] def parse_input(self): self.text_input.update() text = self.text_input.get("1.0", tk.END) lines = text.strip().split('\n') self.chains = [] for line in lines: chain = self.parse_line(line) if chain: self.chains.append(chain) if self.chains: all_symbols = set() for chain in self.chains: for symbol in chain: all_symbols.add(symbol) self.terminals = sorted(all_symbols) self.info_var.set(f"{len(self.chains)} Ketten, {len(self.terminals)} Terminale") self.status_var.set(f"{len(self.chains)} Ketten geladen") self.ars20.load_chains(self.chains, self.start_symbol.get() or None) self.ars30.load_chains(self.chains, self.start_symbol.get() or None) if self.module_status['networkx']: self.petri_builder = PetriNetBuilder(self.chains, self.ars30.rules) # Kodierte Ketten berechnen self.encode_chains() self.show_ars20_preview() else: messagebox.showwarning("Warnung", "Keine gültigen Ketten gefunden!") def encode_chains(self): """Wandelt alle Ketten in 5-Bit-Kodierung um""" if not self.chains: return self.coded_chains = [] self.text_code.delete("1.0", tk.END) self.text_code.insert(tk.END, "5-BIT-KODIERUNG DER TERMINALZEICHEN\n") self.text_code.insert(tk.END, "=" * 70 + "\n\n") self.text_code.insert(tk.END, "Schema: [Sprecher][Phase][Unterphase]\n") self.text_code.insert(tk.END, "S: 0=Kunde, 1=Verkäufer\n") self.text_code.insert(tk.END, "Phase: 00=BG, 01=B, 10=A, 11=AV\n") self.text_code.insert(tk.END, "Unterphase: 00=Basis, 01=Folge\n\n") for i, chain in enumerate(self.chains, 1): coded = TerminalCoding.encode_chain(chain) self.coded_chains.append(coded) self.text_code.insert(tk.END, f"Transkript {i}:\n") self.text_code.insert(tk.END, f" Original: {', '.join(chain)}\n") self.text_code.insert(tk.END, f" Kodiert: {', '.join(coded)}\n\n") self.status_var.set(f"{len(self.chains)} Ketten kodiert") def decode_chains(self): """Zeigt die dekodierten Ketten an (nur zur Bestätigung)""" if not self.coded_chains: messagebox.showinfo("Info", "Keine kodierten Ketten vorhanden") return self.text_code.insert(tk.END, "\n" + "=" * 70 + "\n") self.text_code.insert(tk.END, "DEKODIERTE KETTEN (Kontrolle)\n") self.text_code.insert(tk.END, "=" * 70 + "\n\n") for i, coded in enumerate(self.coded_chains, 1): decoded = TerminalCoding.decode_chain(coded) self.text_code.insert(tk.END, f"Transkript {i} (dekodiert): {', '.join(decoded)}\n") def reset_automaton(self): """Setzt den Automaten zurück""" self.automaton.reset() self.text_auto.delete("1.0", tk.END) self.text_auto.insert(tk.END, "Automaten zurückgesetzt.\n") self.text_auto.insert(tk.END, self.automaton.get_history_string()) self.status_var.set("Automaten zurückgesetzt") def validate_transcript_1(self): """Validiert Transkript 1 mit dem Automaten""" if not self.coded_chains or len(self.coded_chains) < 1: messagebox.showerror("Fehler", "Keine kodierten Ketten vorhanden!") return self.validate_chain(0, "Transkript 1") def validate_all(self): """Validiert alle kodierten Ketten""" if not self.coded_chains: messagebox.showerror("Fehler", "Keine kodierten Ketten vorhanden!") return self.text_auto.delete("1.0", tk.END) valid_count = 0 for i, coded in enumerate(self.coded_chains): valid, state, protocol = self.automaton.validate_chain(coded) self.text_auto.insert(tk.END, f"\n{'='*50}\n") self.text_auto.insert(tk.END, f"VALIDIERUNG TRANSKRIPT {i+1}\n") self.text_auto.insert(tk.END, f"{'='*50}\n") self.text_auto.insert(tk.END, f"Ergebnis: {'✓ GÜLTIG' if valid else '✗ UNGÜLTIG'}\n") self.text_auto.insert(tk.END, f"Endzustand: {state}\n\n") self.text_auto.insert(tk.END, "ENTSCHEIDUNGSPFAD:\n") for step in protocol: self.text_auto.insert(tk.END, f" Schritt {step['position']}: {step['code']} ({step['symbol']}, {step['phase']})\n") self.text_auto.insert(tk.END, f" → {step['state']}\n") self.text_auto.insert(tk.END, f" {step['explanation']}\n") if valid: valid_count += 1 self.text_auto.insert(tk.END, f"\n{'='*50}\n") self.text_auto.insert(tk.END, f"GESAMTERGEBNIS: {valid_count}/{len(self.coded_chains)} gültig\n") self.status_var.set(f"{valid_count}/{len(self.coded_chains)} Ketten gültig") def validate_chain(self, index, name): """Validiert eine einzelne Kette""" coded = self.coded_chains[index] valid, state, protocol = self.automaton.validate_chain(coded) self.text_auto.delete("1.0", tk.END) self.text_auto.insert(tk.END, f"{'='*50}\n") self.text_auto.insert(tk.END, f"VALIDIERUNG {name}\n") self.text_auto.insert(tk.END, f"{'='*50}\n") self.text_auto.insert(tk.END, f"Ergebnis: {'✓ GÜLTIG' if valid else '✗ UNGÜLTIG'}\n") self.text_auto.insert(tk.END, f"Endzustand: {state}\n\n") self.text_auto.insert(tk.END, "ENTSCHEIDUNGSPFAD:\n") for step in protocol: self.text_auto.insert(tk.END, f" Schritt {step['position']}: {step['code']} ({step['symbol']}, {step['phase']})\n") self.text_auto.insert(tk.END, f" → {step['state']}\n") self.text_auto.insert(tk.END, f" {step['explanation']}\n") self.status_var.set(f"{name}: {'gültig' if valid else 'ungültig'}") def run_statistical_analysis(self): """Führt die statistische Analyse durch""" if not hasattr(self, 'coded_chains') or not self.coded_chains: messagebox.showerror("Fehler", "Keine kodierten Ketten vorhanden!") return self.stats_extension.reset() self.text_stats.delete("1.0", tk.END) self.text_stats.insert(tk.END, "STATISTISCHE ANALYSE LÄUFT...\n") self.root.update() for i, coded in enumerate(self.coded_chains): result = self.stats_extension.analyze_coded_chain(coded, i+1) self.text_stats.insert(tk.END, f"✓ Transkript {i+1} analysiert\n") self.root.update() self.text_stats.delete("1.0", tk.END) self.text_stats.insert(tk.END, self.stats_extension.print_report()) self.status_var.set("Statistische Analyse abgeschlossen") def export_statistics(self): """Exportiert die Statistiken als JSON""" if not self.stats_extension.transcript_results: messagebox.showerror("Fehler", "Keine statistischen Daten vorhanden!") return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"statistik_{timestamp}.json" try: exported_file = self.stats_extension.export_json(filename) self.text_stats.insert(tk.END, f"\n\nStatistik exportiert als: {exported_file}\n") messagebox.showinfo("Export erfolgreich", f"Statistik gespeichert als:\n{exported_file}") except Exception as e: messagebox.showerror("Fehler", f"Export fehlgeschlagen:\n{str(e)}") def reset_statistics(self): """Setzt die statistische Analyse zurück""" self.stats_extension.reset() self.text_stats.delete("1.0", tk.END) self.text_stats.insert(tk.END, "Statistik zurückgesetzt.\n") self.status_var.set("Statistik zurückgesetzt") def show_ars20_preview(self): self.text20.delete("1.0", tk.END) self.text20.insert(tk.END, self.ars20.print_grammar()) def run_ars20(self): if not self.chains: messagebox.showerror("Fehler", "Keine Daten geladen!") return self.show_ars20_preview() self.status_var.set("ARS 2.0 abgeschlossen") def run_ars30(self): if not self.chains: messagebox.showerror("Fehler", "Keine Daten geladen!") return self.ars30_progress.start() self.status_var.set("Induziere Grammatik...") def update_progress(iter_num, max_iter, sequence, new_nt): def update(): self.status_var.set(f"Induktion: {new_nt} gefunden") self.safe_gui_update(update) def run(): try: self.ars30.induce_grammar(progress_callback=update_progress) def update_display(): self.text30.delete("1.0", tk.END) self.text30.insert(tk.END, self.ars30.print_grammar()) self.ars30_progress.stop() self.status_var.set("Grammatikinduktion abgeschlossen") self.safe_gui_update(update_display) except Exception as e: def error_display(): messagebox.showerror("Fehler", f"Grammatikinduktion fehlgeschlagen:\n{str(e)}") self.ars30_progress.stop() self.safe_gui_update(error_display) thread = threading.Thread(target=run) thread.daemon = True thread.start() def build_basic_petri(self): if not self.module_status['networkx']: messagebox.showerror("Fehler", "networkx nicht installiert!") return if not self.petri_builder: messagebox.showerror("Fehler", "Keine Daten geladen!") return try: self.petri_builder.build_basic_net() self.text_petri.delete("1.0", tk.END) self.text_petri.insert(tk.END, "Einfaches Petri-Netz erstellt:\n") self.text_petri.insert(tk.END, f"Stellen: {len(self.petri_builder.petri_net.places)}\n") self.text_petri.insert(tk.END, f"Transitionen: {len(self.petri_builder.petri_net.transitions)}\n") self.text_petri.insert(tk.END, f"Kanten: {len(self.petri_builder.petri_net.arcs)}\n") self.status_var.set("Petri-Netz erstellt") except Exception as e: messagebox.showerror("Fehler", f"Fehler beim Erstellen des Petri-Netzes:\n{str(e)}") def build_resource_petri(self): if not self.module_status['networkx']: messagebox.showerror("Fehler", "networkx nicht installiert!") return if not self.petri_builder: messagebox.showerror("Fehler", "Keine Daten geladen!") return try: self.petri_builder.build_resource_net() self.text_petri.delete("1.0", tk.END) self.text_petri.insert(tk.END, "Petri-Netz mit Ressourcen erstellt:\n") self.text_petri.insert(tk.END, f"Stellen: {len(self.petri_builder.petri_net.places)}\n") self.text_petri.insert(tk.END, f"Transitionen: {len(self.petri_builder.petri_net.transitions)}\n") self.text_petri.insert(tk.END, f"Kanten: {len(self.petri_builder.petri_net.arcs)}\n") self.text_petri.insert(tk.END, "\nRessourcen-Stellen:\n") for p, data in self.petri_builder.petri_net.places.items(): if data['type'] == 'resource': self.text_petri.insert(tk.END, f" {p}: {data['initial_tokens']} Token\n") self.status_var.set("Petri-Netz mit Ressourcen erstellt") except Exception as e: messagebox.showerror("Fehler", f"Fehler beim Erstellen des Petri-Netzes:\n{str(e)}") def simulate_petri(self): if not self.module_status['networkx']: messagebox.showerror("Fehler", "networkx nicht installiert!") return if not self.petri_builder or not self.petri_builder.petri_net: messagebox.showerror("Fehler", "Kein Petri-Netz vorhanden!") return if not self.chains: return try: results, marking = self.petri_builder.simulate_chain(self.chains[0]) self.text_petri.insert(tk.END, "\n" + "="*50 + "\n") self.text_petri.insert(tk.END, "Simulation Transkript 1:\n") self.text_petri.insert(tk.END, "="*50 + "\n") for sym, success, reason in results: status = "✓" if success else "✗" self.text_petri.insert(tk.END, f"{status} {sym}: {reason}\n") self.text_petri.insert(tk.END, f"\nFinale Markierung:\n") for p, tokens in marking.items(): if tokens > 0: self.text_petri.insert(tk.END, f" {p}: {tokens}\n") self.status_var.set("Simulation abgeschlossen") except Exception as e: messagebox.showerror("Fehler", f"Fehler bei der Simulation:\n{str(e)}") def generate_chains(self): source = self.gen_source.get() count = int(self.gen_count.get()) self.text_gen.delete("1.0", tk.END) if source == "ars20": probs = self.ars20.optimized_probabilities or self.ars20.probabilities if not probs: self.text_gen.insert(tk.END, "Keine ARS 2.0 Grammatik!\n") return self.text_gen.insert(tk.END, f"ARS 2.0 - {count} generierte Ketten:\n\n") for i in range(count): chain = self.ars20.generate_chain() if chain: self.text_gen.insert(tk.END, f"{i+1}: {' → '.join(chain)}\n") else: # ars30 if not self.ars30.rules: self.text_gen.insert(tk.END, "Keine ARS 3.0 Grammatik!\n") return self.text_gen.insert(tk.END, f"ARS 3.0 - {count} generierte Ketten:\n\n") for i in range(count): chain = self.ars30.generate_chain() if chain: self.text_gen.insert(tk.END, f"{i+1}: {' → '.join(chain)}\n") def load_file(self): filename = filedialog.askopenfilename( title="Datei auswählen", filetypes=[("Textdateien", "*.txt"), ("Alle Dateien", "*.*")] ) if filename: try: with open(filename, 'r', encoding='utf-8') as f: content = f.read() self.text_input.delete("1.0", tk.END) self.text_input.insert("1.0", content) self.status_var.set(f"Geladen: {filename}") except Exception as e: messagebox.showerror("Fehler", f"Kann Datei nicht laden:\n{e}") def load_example(self): example = """KBG, VBG, KBBd, VBBd, KBA, VBA, KBBd, VBBd, KBA, VAA, KAA, VAV, KAV VBG, KBBd, VBBd, VAA, KAA, VBG, KBBd, VAA, KAA KBBd, VBBd, VAA, KAA KBBd, VBBd, KBA, VBA, KBBd, VBA, KAE, VAE, KAA, VAV, KAV KBG, VBG, KBBd, VBBd, KAA KBBd, VBBd, KBA, VAA, KAA KBG, VBBd, KBBd, VBA, VAA, KAA, VAV, KAV""" self.text_input.delete("1.0", tk.END) self.text_input.insert("1.0", example) self.parse_input() def show_about(self): about = """ARS 4.0 - Algorithmic Recursive Sequence Analysis Erweiterte Version mit: - 5-Bit-Kodierung der Terminalzeichen - Formalem Entscheidungsautomaten - Statistischer Analyse empirischer Abweichungen - Erkennung von Schleifen und Wiederholungen - Expliziter Trennung von Struktur und Statistik © 2026 Paul Koop""" messagebox.showinfo("Über ARS", about) # ============================================================================ # HAUPTFUNKTION # ============================================================================ def main(): root = tk.Tk() app = ARSGUI(root) root.mainloop() if __name__ == "__main__": main()