Inhalt

Aktueller Ordner: algorithmisch-rekursive-sequenzanalyse-3.0
⬅ Übergeordnet

ars6_gui_app.py

import os
import json
import numpy as np
from sklearn.cluster import HDBSCAN
from sklearn.feature_extraction.text import TfidfVectorizer
from sentence_transformers import SentenceTransformer
from collections import defaultdict
from scipy.stats import pearsonr
import tkinter as tk
from tkinter import filedialog, ttk, messagebox
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from nltk import ngrams
from langdetect import detect
from transformers import pipeline
import threading
from copy import deepcopy

class EnhancedDialogAnalyzer:
    def __init__(self, root):
        self.root = root
        self.root.title("LLM-enhanced Dialog Analyzer")
        
        # Modelle initialisieren
        self.embedding_model = SentenceTransformer("paraphrase-multilingual-mpnet-base-v2")
        self.llm = pipeline("text2text-generation", model="google/flan-t5-base")
        
        # Datenstrukturen
        self.transcripts = []
        self.interacts = []
        self.pcfg = {}
        self.empirical_chain = []
        
        # GUI
        self.setup_ui()
        self.setup_visualization()
        
    def setup_ui(self):
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.grid(row=0, column=0, sticky="nsew")
        
        ttk.Button(main_frame, text="1. Load & Preprocess", 
                  command=self.load_and_preprocess).grid(row=0, column=0, pady=5)
        ttk.Button(main_frame, text="2. Analyze Meanings", 
                  command=self.analyze_meanings).grid(row=1, column=0, pady=5)
        ttk.Button(main_frame, text="3. Build Semantic PCFG", 
                  command=self.build_semantic_pcfg).grid(row=2, column=0, pady=5)
        ttk.Button(main_frame, text="4. Optimize", 
                  command=self.optimize_grammar).grid(row=3, column=0, pady=5)
        ttk.Button(main_frame, text="5. Visualize", 
                  command=self.visualize_grammar).grid(row=4, column=0, pady=5)
        
        self.output_text = tk.Text(main_frame, height=20, width=80)
        self.output_text.grid(row=0, column=1, rowspan=5, padx=10)
        
    def setup_visualization(self):
        self.figure = plt.Figure(figsize=(8, 6), dpi=100)
        self.canvas = FigureCanvasTkAgg(self.figure, master=self.root)
        self.canvas.get_tk_widget().grid(row=0, column=2, rowspan=5, padx=10)
        
    def load_and_preprocess(self):
        files = filedialog.askopenfilenames(filetypes=[("Text files", "*.txt")])
        if not files:
            return
            
        self.transcripts = []
        for file in files:
            with open(file, 'r', encoding='utf-8') as f:
                self.transcripts.extend([line.strip() for line in f if line.strip()])
        
        self.log(f"Loaded {len(self.transcripts)} utterances")
        threading.Thread(target=self._preprocess).start()
    
    def _preprocess(self):
        self.root.after(0, lambda: self.log("Detecting languages..."))
        languages = set()
        for utterance in self.transcripts:
            try:
                lang = detect(utterance)
                languages.add(lang)
            except:
                pass
        self.root.after(0, lambda: self.log(f"Detected languages: {', '.join(languages)}"))
        
        self.root.after(0, lambda: self.log("Creating embeddings..."))
        self.embeddings = self.embedding_model.encode(self.transcripts)
        
    def analyze_meanings(self):
        if not self.transcripts:
            messagebox.showwarning("Warning", "Load transcripts first!")
            return
            
        self.interacts = []
        for i, utterance in enumerate(self.transcripts):
            # Zuerst manuelle Klassifikation versuchen
            manual_meaning = self._preprocess_utterance(utterance)
            if manual_meaning:
                self.interacts.append({
                    "utterance": utterance,
                    "meanings": [manual_meaning],
                    "selected_meaning": manual_meaning
                })
                continue
                
            context = " | ".join([u["utterance"] for u in self.interacts[-3:]]) if i > 0 else ""
            meanings = self._generate_meanings(utterance, context)
            filtered_meanings = self._filter_meanings(meanings, i)
            
            self.interacts.append({
                "utterance": utterance,
                "meanings": meanings,
                "selected_meaning": filtered_meanings[0] if filtered_meanings else "UNK"
            })
            
            self.log(f"Utterance {i+1}: {utterance[:50]}...")
            self.log(f"  Selected meaning: {filtered_meanings[0] if filtered_meanings else 'UNK'}")
    
    def _preprocess_utterance(self, utterance):
        # Kürzen und Normalisieren
        utterance = utterance[:100].lower().replace('\n', ' ')
        
        # Erweiterte Mustererkennung
        patterns = {
            'order': ['bitte', 'nehme', 'hätte', 'kaufen', 'gramm', 'kilo'],
            'question': ['?', 'wie', 'was', 'wo '],
            'confirmation': ['ja', 'okay', 'genau', 'richtig'],
            'greeting': ['guten tag', 'hallo', 'guten morgen'],
            'thanks': ['danke', 'vielen dank', 'dankeschön']
        }
        
        for category, keywords in patterns.items():
            if any(kw in utterance for kw in keywords):
                return f"{category.upper()}"
        
        return None
    
    def _generate_meanings(self, utterance, context):
        manual_meaning = self._preprocess_utterance(utterance)
        if manual_meaning:
            return [manual_meaning]
            
        prompt = f"""Generate a SINGLE, concise interpretation for this dialog utterance in German:
        Context: '{context}'
        Utterance: '{utterance}'
        Interpretation: The speaker"""
        try:
            output = self.llm(prompt, max_length=50, num_return_sequences=1)
            return [output[0]["generated_text"].strip()]
        except Exception as e:
            return ["UNK"]
    
    def _filter_meanings(self, meanings, index):
        if index == 0 or not meanings:
            return meanings
        return [meanings[0]]  # Immer erste Bedeutung akzeptieren
    
    def build_semantic_pcfg(self):
        if not self.interacts:
            messagebox.showwarning("Warning", "Analyze meanings first!")
            return
            
        meaning_embeddings = self.embedding_model.encode(
            [i["selected_meaning"] for i in self.interacts]
        )
        
        clusterer = HDBSCAN(min_cluster_size=3, metric='cosine')
        clusters = clusterer.fit_predict(meaning_embeddings)
        
        terminal_symbols = []
        for i, cluster_id in enumerate(clusters):
            if cluster_id == -1:
                terminal_symbols.append(f"T_{i}")
            else:
                terminal_symbols.append(f"C_{cluster_id}")
        
        self.pcfg = defaultdict(dict)
        self.empirical_chain = terminal_symbols
        
        # Stärkere Gewichtung häufiger Übergänge mit exponentieller Gewichtung
        for i in range(len(terminal_symbols)-1):
            src = terminal_symbols[i]
            dst = terminal_symbols[i+1]
            src_nt = f"NT_{src}"
            
            weight = np.exp(-0.1 * i)  # Exponentielle Gewichtung für nahe Übergänge
            self.pcfg[src][src_nt] = 1.0
            self.pcfg[src_nt][dst] = self.pcfg[src_nt].get(dst, 0) + weight
        
        # Normalisierung der Wahrscheinlichkeiten
        for src in self.pcfg:
            total = sum(self.pcfg[src].values())
            self.pcfg[src] = {dst: count/total for dst, count in self.pcfg[src].items()}
        
        self.log("\nGenerated Semantic PCFG:")
        for src in list(self.pcfg.keys())[:5]:
            for dst, prob in list(self.pcfg[src].items())[:3]:
                self.log(f"  {src.ljust(10)} → {dst.ljust(15)} [{prob:.2f}]")
    
    def _calculate_frequencies(self, chains):
        freq_dict = defaultdict(lambda: defaultdict(int))
        
        for chain in chains:
            for i in range(len(chain)-1):
                current = chain[i]
                next_symbol = chain[i+1]
                freq_dict[current][next_symbol] += 1
        
        all_transitions = []
        for src in freq_dict:
            for dst in freq_dict[src]:
                all_transitions.append(freq_dict[src][dst])
        
        return np.array(all_transitions)

    def _simulate_chain(self, max_length):
        if not self.pcfg:
            return []
        
        chain = []
        current = np.random.choice(list(self.pcfg.keys()))
        chain.append(current)
        
        while len(chain) < max_length:
            if current not in self.pcfg:
                break
                
            next_symbols = list(self.pcfg[current].keys())
            probs = list(self.pcfg[current].values())
            next_symbol = np.random.choice(next_symbols, p=probs)
            
            chain.append(next_symbol)
            current = next_symbol
            
        return chain

    def _adjust_probabilities(self, empirical, generated):
        # Dynamische Lernrate basierend auf Datensatzgröße
        adjustment_factor = max(0.01, 0.2 * (1 - np.exp(-len(self.empirical_chain)/100)))
        
        temp_freq = defaultdict(lambda: defaultdict(float))
        
        for i in range(len(self.empirical_chain)-1):
            src = self.empirical_chain[i]
            dst = self.empirical_chain[i+1]
            temp_freq[src][dst] += 1
        
        for src in self.pcfg:
            for dst in self.pcfg[src]:
                base_prob = temp_freq[src].get(dst, 0)
                smoothed_prob = (base_prob + 0.1) / (sum(temp_freq[src].values()) + 0.1 * len(self.pcfg[src]))
                self.pcfg[src][dst] = (1 - adjustment_factor) * self.pcfg[src][dst] + \
                                      adjustment_factor * smoothed_prob
        
        for src in self.pcfg:
            total = sum(self.pcfg[src].values())
            if total > 0:
                for dst in self.pcfg[src]:
                    self.pcfg[src][dst] /= total
    
    def optimize_grammar(self, iterations=20):
        if not self.pcfg:
            messagebox.showwarning("Warning", "Build PCFG first!")
            return
            
        empirical_freq = self._calculate_frequencies([self.empirical_chain])
        best_corr = -1
        best_pcfg = deepcopy(self.pcfg)
        
        for i in range(iterations):
            generated_chains = [
                self._simulate_chain(max_length=len(self.empirical_chain)) 
                for _ in range(5)
            ]
            
            gen_freq = self._calculate_frequencies(generated_chains)
            
            min_length = min(len(empirical_freq), len(gen_freq))
            empirical = empirical_freq[:min_length]
            generated = gen_freq[:min_length]
            
            try:
                if len(empirical) > 1 and len(generated) > 1:
                    corr, p_value = pearsonr(empirical, generated)
                    self.log(f"Iteration {i+1}: r = {corr:.3f}, p = {p_value:.3f}")
                    
                    if abs(corr) > 0.3 and p_value < 0.1:  # Nur signifikante Anpassungen
                        if corr > best_corr:
                            best_corr = corr
                            best_pcfg = deepcopy(self.pcfg)
                        
                        self._adjust_probabilities(empirical_freq, gen_freq)
                    
                    if corr > 0.9:
                        break
                else:
                    self.log(f"Iteration {i+1}: Not enough data for correlation")
                    break
                    
            except Exception as e:
                self.log(f"Error in iteration {i+1}: {str(e)}")
                break
        
        self.pcfg = best_pcfg  # Restore best version
        self.log(f"Optimization finished. Best r = {best_corr:.3f}")
        self.evaluate_grammar()

    def evaluate_grammar(self):
        # Berechne Konsistenz der generierten Dialoge
        test_chains = [self._simulate_chain(10) for _ in range(10)]
        coherence_scores = [self._calculate_coherence(chain) for chain in test_chains]
        self.log(f"Average coherence: {np.mean(coherence_scores):.2f}")

    def _calculate_coherence(self, chain):
        # Einfache Kohärenzmetrik: Anteil gültiger Übergänge
        valid_transitions = 0
        for i in range(len(chain)-1):
            if chain[i] in self.pcfg and chain[i+1] in self.pcfg[chain[i]]:
                valid_transitions += 1
        return valid_transitions / max(1, len(chain)-1)

    def visualize_grammar(self):
        if not self.pcfg:
            messagebox.showwarning("Warning", "Build PCFG first!")
            return

        self.figure.clf()
        G = nx.DiGraph()
        
        for src, transitions in self.pcfg.items():
            for dst, prob in transitions.items():
                G.add_edge(src, dst, weight=prob)
        
        pos = nx.spring_layout(G, k=0.8, iterations=50, seed=42)
        
        node_size = 1200
        font_size = 8
        edge_width_scale = 2.5
        
        ax = self.figure.add_subplot(111)
        
        nx.draw_networkx_nodes(
            G, pos, 
            node_size=node_size,
            node_color='skyblue',
            alpha=0.9,
            ax=ax
        )
        
        edges = nx.draw_networkx_edges(
            G, pos,
            width=[d['weight']*edge_width_scale for (_, _, d) in G.edges(data=True)],
            edge_color='gray',
            alpha=0.7,
            arrowstyle='->',
            arrowsize=15,
            ax=ax
        )
        
        nx.draw_networkx_labels(
            G, pos,
            font_size=font_size,
            font_family='sans-serif',
            ax=ax
        )
        
        edge_labels = {
            (u, v): f"{d['weight']:.2f}" 
            for u, v, d in G.edges(data=True)
        }
        nx.draw_networkx_edge_labels(
            G, pos,
            edge_labels=edge_labels,
            font_size=font_size-1,
            label_pos=0.5,
            ax=ax
        )
        
        ax.set_title("Probabilistic Context-Free Grammar (PCFG)")
        ax.axis('off')
        plt.tight_layout()
        self.canvas.draw()
    
    def log(self, message):
        self.output_text.insert(tk.END, message + "\n")
        self.output_text.see(tk.END)

if __name__ == "__main__":
    root = tk.Tk()
    app = EnhancedDialogAnalyzer(root)
    root.mainloop()