771 lines
34 KiB
Python
Executable File
771 lines
34 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
BMAD Memory Synchronization Integration
|
|
|
|
Establishes seamless integration between orchestrator state and OpenMemory MCP system.
|
|
Provides real-time memory monitoring, pattern recognition sync, decision archaeology,
|
|
user preference persistence, and proactive intelligence hooks.
|
|
|
|
Usage:
|
|
python .ai/memory-sync-integration.py [--sync-now] [--monitor] [--diagnose]
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import yaml
|
|
import time
|
|
import asyncio
|
|
import threading
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Dict, List, Any, Optional, Tuple, Callable
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
import logging
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MemoryProviderStatus(Enum):
|
|
"""Memory provider status enum."""
|
|
CONNECTED = "connected"
|
|
DEGRADED = "degraded"
|
|
OFFLINE = "offline"
|
|
|
|
class SyncMode(Enum):
|
|
"""Memory synchronization modes"""
|
|
REAL_TIME = "real-time"
|
|
BATCH = "batch"
|
|
ON_DEMAND = "on-demand"
|
|
FALLBACK = "fallback"
|
|
|
|
@dataclass
|
|
class MemoryMetrics:
|
|
"""Memory system performance metrics"""
|
|
connection_latency: float = 0.0
|
|
sync_success_rate: float = 0.0
|
|
pattern_recognition_accuracy: float = 0.0
|
|
proactive_insights_generated: int = 0
|
|
total_memories_created: int = 0
|
|
last_sync_time: Optional[datetime] = None
|
|
errors_count: int = 0
|
|
|
|
@dataclass
|
|
class MemoryPattern:
|
|
"""Represents a recognized memory pattern"""
|
|
pattern_id: str
|
|
pattern_type: str
|
|
confidence: float
|
|
frequency: int
|
|
success_rate: float
|
|
last_occurrence: datetime
|
|
context_tags: List[str] = field(default_factory=list)
|
|
effectiveness_score: float = 0.0
|
|
|
|
class MemorySyncIntegration:
|
|
"""Main memory synchronization integration system."""
|
|
|
|
def __init__(self, state_file: str = ".ai/orchestrator-state.md", sync_interval: int = 30):
|
|
self.state_file = Path(state_file)
|
|
self.sync_interval = sync_interval
|
|
self.memory_available = False
|
|
self.metrics = MemoryMetrics()
|
|
self.patterns = {}
|
|
self.user_preferences = {}
|
|
self.decision_context = {}
|
|
self.proactive_insights = []
|
|
self.sync_mode = SyncMode.REAL_TIME
|
|
self.running = False
|
|
|
|
# Callback functions for memory operations
|
|
self.memory_functions = {
|
|
'add_memories': None,
|
|
'search_memory': None,
|
|
'list_memories': None
|
|
}
|
|
|
|
# Initialize connection status
|
|
self._check_memory_provider_status()
|
|
|
|
def initialize_memory_functions(self, add_memories_func: Callable,
|
|
search_memory_func: Callable,
|
|
list_memories_func: Callable):
|
|
"""Initialize memory function callbacks."""
|
|
self.memory_functions['add_memories'] = add_memories_func
|
|
self.memory_functions['search_memory'] = search_memory_func
|
|
self.memory_functions['list_memories'] = list_memories_func
|
|
self.memory_available = True
|
|
logger.info("Memory functions initialized successfully")
|
|
|
|
def _check_memory_provider_status(self) -> MemoryProviderStatus:
|
|
"""Check current memory provider connection status."""
|
|
try:
|
|
# Attempt to verify memory system connectivity
|
|
if not self.memory_available:
|
|
return MemoryProviderStatus.OFFLINE
|
|
|
|
# Test basic connectivity
|
|
start_time = time.time()
|
|
if self.memory_functions['list_memories']:
|
|
try:
|
|
# Quick connectivity test
|
|
self.memory_functions['list_memories'](limit=1)
|
|
self.metrics.connection_latency = time.time() - start_time
|
|
|
|
if self.metrics.connection_latency < 1.0:
|
|
return MemoryProviderStatus.CONNECTED
|
|
else:
|
|
return MemoryProviderStatus.DEGRADED
|
|
except Exception as e:
|
|
logger.warning(f"Memory connectivity test failed: {e}")
|
|
return MemoryProviderStatus.OFFLINE
|
|
else:
|
|
return MemoryProviderStatus.OFFLINE
|
|
|
|
except Exception as e:
|
|
logger.error(f"Memory provider status check failed: {e}")
|
|
return MemoryProviderStatus.OFFLINE
|
|
|
|
def sync_orchestrator_state_with_memory(self) -> Dict[str, Any]:
|
|
"""Synchronize current orchestrator state with memory system."""
|
|
sync_results = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"status": "success",
|
|
"operations": [],
|
|
"insights_generated": 0,
|
|
"patterns_updated": 0,
|
|
"errors": []
|
|
}
|
|
|
|
try:
|
|
# Load current orchestrator state
|
|
state_data = self._load_orchestrator_state()
|
|
if not state_data:
|
|
sync_results["status"] = "error"
|
|
sync_results["errors"].append("Could not load orchestrator state")
|
|
return sync_results
|
|
|
|
# 1. Update memory provider status in state
|
|
provider_status = self._check_memory_provider_status()
|
|
self._update_memory_status_in_state(state_data, provider_status)
|
|
sync_results["operations"].append(f"Updated memory status: {provider_status.value}")
|
|
|
|
# 2. Create sample memories if none exist and we have bootstrap data
|
|
sample_memories_created = self._create_sample_memories_from_bootstrap(state_data)
|
|
if sample_memories_created > 0:
|
|
sync_results["operations"].append(f"Created {sample_memories_created} sample memories from bootstrap data")
|
|
|
|
# 3. Sync decision archaeology (works with fallback now)
|
|
decisions_synced = self._sync_decision_archaeology_enhanced(state_data)
|
|
sync_results["operations"].append(f"Synced {decisions_synced} decisions to memory")
|
|
|
|
# 4. Update pattern recognition
|
|
patterns_updated = self._update_pattern_recognition_enhanced(state_data)
|
|
sync_results["patterns_updated"] = patterns_updated
|
|
sync_results["operations"].append(f"Updated {patterns_updated} patterns")
|
|
|
|
# 5. Sync user preferences
|
|
prefs_synced = self._sync_user_preferences_enhanced(state_data)
|
|
sync_results["operations"].append(f"Synced {prefs_synced} user preferences")
|
|
|
|
# 6. Generate proactive insights (enhanced to work with fallback)
|
|
insights = self._generate_proactive_insights_enhanced(state_data)
|
|
sync_results["insights_generated"] = len(insights)
|
|
sync_results["operations"].append(f"Generated {len(insights)} proactive insights")
|
|
|
|
# 7. Update orchestrator state with memory intelligence
|
|
self._update_state_with_memory_intelligence(state_data, insights)
|
|
|
|
# 8. Save updated state
|
|
self._save_orchestrator_state(state_data)
|
|
sync_results["operations"].append("Saved updated orchestrator state")
|
|
|
|
# Update metrics
|
|
self.metrics.last_sync_time = datetime.now(timezone.utc)
|
|
self.metrics.total_memories_created += decisions_synced + prefs_synced + sample_memories_created
|
|
|
|
logger.info(f"Memory sync completed: {len(sync_results['operations'])} operations")
|
|
|
|
except Exception as e:
|
|
sync_results["status"] = "error"
|
|
sync_results["errors"].append(str(e))
|
|
self.metrics.errors_count += 1
|
|
logger.error(f"Memory sync failed: {e}")
|
|
|
|
return sync_results
|
|
|
|
def _load_orchestrator_state(self) -> Optional[Dict[str, Any]]:
|
|
"""Load orchestrator state from file."""
|
|
try:
|
|
if not self.state_file.exists():
|
|
logger.warning(f"Orchestrator state file not found: {self.state_file}")
|
|
return None
|
|
|
|
with open(self.state_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Extract YAML from markdown
|
|
import re
|
|
yaml_match = re.search(r'```yaml\n(.*?)\n```', content, re.MULTILINE | re.DOTALL)
|
|
if yaml_match:
|
|
yaml_content = yaml_match.group(1)
|
|
return yaml.safe_load(yaml_content)
|
|
else:
|
|
logger.error("No YAML content found in orchestrator state file")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to load orchestrator state: {e}")
|
|
return None
|
|
|
|
def _save_orchestrator_state(self, state_data: Dict[str, Any]):
|
|
"""Save orchestrator state to file."""
|
|
try:
|
|
yaml_content = yaml.dump(state_data, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
|
|
|
content = f"""# BMAD Orchestrator State (Memory-Enhanced)
|
|
|
|
```yaml
|
|
{yaml_content}```
|
|
|
|
---
|
|
**Auto-Generated**: This state is automatically maintained by the BMAD Memory System
|
|
**Last Memory Sync**: {datetime.now(timezone.utc).isoformat()}
|
|
**Next Diagnostic**: {(datetime.now(timezone.utc) + timedelta(minutes=20)).isoformat()}
|
|
**Context Restoration Ready**: true
|
|
"""
|
|
|
|
# Create backup
|
|
if self.state_file.exists():
|
|
backup_path = self.state_file.with_suffix(f'.backup.{int(time.time())}')
|
|
self.state_file.rename(backup_path)
|
|
logger.debug(f"Created backup: {backup_path}")
|
|
|
|
with open(self.state_file, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save orchestrator state: {e}")
|
|
raise
|
|
|
|
def _update_memory_status_in_state(self, state_data: Dict[str, Any], status: MemoryProviderStatus):
|
|
"""Update memory provider status in orchestrator state."""
|
|
if "memory_intelligence_state" not in state_data:
|
|
state_data["memory_intelligence_state"] = {}
|
|
|
|
memory_state = state_data["memory_intelligence_state"]
|
|
memory_state["memory_status"] = status.value
|
|
memory_state["last_memory_sync"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Update connection metrics
|
|
if "connection_metrics" not in memory_state:
|
|
memory_state["connection_metrics"] = {}
|
|
|
|
memory_state["connection_metrics"].update({
|
|
"latency_ms": round(self.metrics.connection_latency * 1000, 2),
|
|
"success_rate": self.metrics.sync_success_rate,
|
|
"total_errors": self.metrics.errors_count,
|
|
"last_check": datetime.now(timezone.utc).isoformat()
|
|
})
|
|
|
|
def _create_sample_memories_from_bootstrap(self, state_data: Dict[str, Any]) -> int:
|
|
"""Create sample memories from bootstrap analysis data if none exist."""
|
|
try:
|
|
# Check if we already have memories
|
|
if self.memory_available:
|
|
# Would check actual memory count
|
|
return 0
|
|
|
|
# Check fallback storage
|
|
fallback_data = self._load_fallback_data() if hasattr(self, '_load_fallback_data') else {}
|
|
if fallback_data.get("memories", []):
|
|
return 0 # Already have memories
|
|
|
|
memories_created = 0
|
|
bootstrap = state_data.get("bootstrap_analysis_results", {})
|
|
project_name = state_data.get("session_metadata", {}).get("project_name", "unknown")
|
|
|
|
# Create memories from bootstrap successful approaches
|
|
successful_approaches = bootstrap.get("discovered_patterns", {}).get("successful_approaches", [])
|
|
for approach in successful_approaches:
|
|
memory_entry = {
|
|
"type": "pattern",
|
|
"pattern_name": approach.lower().replace(" ", "-"),
|
|
"description": approach,
|
|
"project": project_name,
|
|
"source": "bootstrap-analysis",
|
|
"effectiveness": 0.9,
|
|
"confidence": 0.8,
|
|
"timestamp": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
if self._add_to_fallback_memory(memory_entry, ["pattern", "successful", "bootstrap"]):
|
|
memories_created += 1
|
|
|
|
# Create memories from discovered patterns
|
|
patterns = bootstrap.get("project_archaeology", {})
|
|
if patterns.get("decisions_extracted", 0) > 0:
|
|
decision_memory = {
|
|
"type": "decision",
|
|
"decision": "orchestrator-state-enhancement-approach",
|
|
"rationale": "Memory-enhanced orchestrator provides better context continuity",
|
|
"project": project_name,
|
|
"persona": "architect",
|
|
"outcome": "successful",
|
|
"confidence_level": 90,
|
|
"timestamp": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
if self._add_to_fallback_memory(decision_memory, ["decision", "architect", "orchestrator"]):
|
|
memories_created += 1
|
|
|
|
return memories_created
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to create sample memories: {e}")
|
|
return 0
|
|
|
|
def _add_to_fallback_memory(self, memory_content: Dict[str, Any], tags: List[str]) -> bool:
|
|
"""Add memory to fallback storage."""
|
|
try:
|
|
# Initialize fallback storage if not exists
|
|
fallback_file = Path('.ai/memory-fallback.json')
|
|
|
|
if fallback_file.exists():
|
|
with open(fallback_file, 'r') as f:
|
|
data = json.load(f)
|
|
else:
|
|
data = {
|
|
"memories": [],
|
|
"patterns": [],
|
|
"preferences": {},
|
|
"decisions": [],
|
|
"insights": [],
|
|
"created": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
# Add memory entry
|
|
memory_entry = {
|
|
"id": f"mem_{len(data['memories'])}_{int(datetime.now().timestamp())}",
|
|
"content": json.dumps(memory_content),
|
|
"tags": tags,
|
|
"metadata": {
|
|
"type": memory_content.get("type", "unknown"),
|
|
"confidence": memory_content.get("confidence", 0.8)
|
|
},
|
|
"created": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
data["memories"].append(memory_entry)
|
|
data["last_updated"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Save to file
|
|
with open(fallback_file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to add to fallback memory: {e}")
|
|
return False
|
|
|
|
def _sync_decision_archaeology_enhanced(self, state_data: Dict[str, Any]) -> int:
|
|
"""Enhanced decision archaeology sync that works with fallback storage."""
|
|
decisions_synced = 0
|
|
decision_archaeology = state_data.get("decision_archaeology", {})
|
|
|
|
# Sync existing decisions from state
|
|
for decision in decision_archaeology.get("major_decisions", []):
|
|
try:
|
|
memory_content = {
|
|
"type": "decision",
|
|
"project": state_data.get("session_metadata", {}).get("project_name", "unknown"),
|
|
"decision_id": decision.get("decision_id"),
|
|
"persona": decision.get("persona"),
|
|
"decision": decision.get("decision"),
|
|
"rationale": decision.get("rationale"),
|
|
"alternatives_considered": decision.get("alternatives_considered", []),
|
|
"constraints": decision.get("constraints", []),
|
|
"outcome": decision.get("outcome", "pending"),
|
|
"confidence_level": decision.get("confidence_level", 50),
|
|
"timestamp": decision.get("timestamp")
|
|
}
|
|
|
|
if self._add_to_fallback_memory(memory_content, ["decision", decision.get("persona", "unknown"), "bmad-archaeology"]):
|
|
decisions_synced += 1
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to sync decision {decision.get('decision_id')}: {e}")
|
|
|
|
# Create sample decision if none exist
|
|
if decisions_synced == 0:
|
|
sample_decision = {
|
|
"type": "decision",
|
|
"project": state_data.get("session_metadata", {}).get("project_name", "unknown"),
|
|
"decision_id": "sample-memory-integration",
|
|
"persona": "architect",
|
|
"decision": "Implement memory-enhanced orchestrator state",
|
|
"rationale": "Provides better context continuity and learning across sessions",
|
|
"alternatives_considered": ["Simple state storage", "No persistence"],
|
|
"constraints": ["Memory system availability", "Performance requirements"],
|
|
"outcome": "successful",
|
|
"confidence_level": 85,
|
|
"timestamp": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
if self._add_to_fallback_memory(sample_decision, ["decision", "architect", "sample"]):
|
|
decisions_synced += 1
|
|
|
|
return decisions_synced
|
|
|
|
def _update_pattern_recognition_enhanced(self, state_data: Dict[str, Any]) -> int:
|
|
"""Enhanced pattern recognition that works with fallback storage."""
|
|
patterns_updated = 0
|
|
memory_state = state_data.get("memory_intelligence_state", {})
|
|
|
|
try:
|
|
# Search fallback storage for patterns
|
|
fallback_file = Path('.ai/memory-fallback.json')
|
|
if fallback_file.exists():
|
|
with open(fallback_file, 'r') as f:
|
|
fallback_data = json.load(f)
|
|
|
|
# Extract patterns from memories
|
|
workflow_patterns = []
|
|
decision_patterns = []
|
|
|
|
for memory in fallback_data.get("memories", []):
|
|
try:
|
|
content = json.loads(memory["content"])
|
|
if content.get("type") == "pattern":
|
|
pattern = {
|
|
"pattern_name": content.get("pattern_name", "unknown-pattern"),
|
|
"confidence": int(content.get("confidence", 0.8) * 100),
|
|
"usage_frequency": 1,
|
|
"success_rate": content.get("effectiveness", 0.9) * 100,
|
|
"source": "memory-intelligence"
|
|
}
|
|
workflow_patterns.append(pattern)
|
|
patterns_updated += 1
|
|
|
|
elif content.get("type") == "decision":
|
|
pattern = {
|
|
"pattern_type": "process",
|
|
"pattern_description": f"Decision pattern: {content.get('decision', 'unknown')}",
|
|
"effectiveness_score": content.get("confidence_level", 80),
|
|
"source": "memory-analysis"
|
|
}
|
|
decision_patterns.append(pattern)
|
|
patterns_updated += 1
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error processing memory for patterns: {e}")
|
|
|
|
# Update pattern recognition in state
|
|
if "pattern_recognition" not in memory_state:
|
|
memory_state["pattern_recognition"] = {
|
|
"workflow_patterns": [],
|
|
"decision_patterns": [],
|
|
"anti_patterns_detected": []
|
|
}
|
|
|
|
memory_state["pattern_recognition"]["workflow_patterns"] = workflow_patterns[:5]
|
|
memory_state["pattern_recognition"]["decision_patterns"] = decision_patterns[:5]
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Pattern recognition update failed: {e}")
|
|
|
|
return patterns_updated
|
|
|
|
def _sync_user_preferences_enhanced(self, state_data: Dict[str, Any]) -> int:
|
|
"""Enhanced user preferences sync that works with fallback storage."""
|
|
prefs_synced = 0
|
|
memory_state = state_data.get("memory_intelligence_state", {})
|
|
user_prefs = memory_state.get("user_preferences", {})
|
|
|
|
if user_prefs:
|
|
try:
|
|
preference_memory = {
|
|
"type": "user-preference",
|
|
"communication_style": user_prefs.get("communication_style"),
|
|
"workflow_style": user_prefs.get("workflow_style"),
|
|
"documentation_preference": user_prefs.get("documentation_preference"),
|
|
"feedback_style": user_prefs.get("feedback_style"),
|
|
"confidence": user_prefs.get("confidence", 80),
|
|
"timestamp": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
if self._add_to_fallback_memory(preference_memory, ["user-preference", "workflow-style", "bmad-intelligence"]):
|
|
prefs_synced = 1
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to sync user preferences: {e}")
|
|
|
|
return prefs_synced
|
|
|
|
def _generate_proactive_insights_enhanced(self, state_data: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Enhanced insights generation that works with fallback storage."""
|
|
insights = []
|
|
|
|
try:
|
|
# Get current context
|
|
current_workflow = state_data.get("active_workflow_context", {})
|
|
current_persona = current_workflow.get("current_state", {}).get("active_persona")
|
|
current_phase = current_workflow.get("current_state", {}).get("current_phase")
|
|
|
|
# Search fallback storage for relevant insights
|
|
fallback_file = Path('.ai/memory-fallback.json')
|
|
if fallback_file.exists():
|
|
with open(fallback_file, 'r') as f:
|
|
fallback_data = json.load(f)
|
|
|
|
# Generate insights from stored memories
|
|
for memory in fallback_data.get("memories", []):
|
|
try:
|
|
content = json.loads(memory["content"])
|
|
|
|
if content.get("type") == "decision" and content.get("outcome") == "successful":
|
|
insight = {
|
|
"type": "pattern",
|
|
"insight": f"✅ Success Pattern: {content.get('decision', 'Unknown decision')} worked well in similar context",
|
|
"confidence": content.get("confidence_level", 80),
|
|
"source": "memory-intelligence",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"context": f"{current_persona}-{current_phase}"
|
|
}
|
|
insights.append(insight)
|
|
|
|
elif content.get("type") == "pattern":
|
|
insight = {
|
|
"type": "optimization",
|
|
"insight": f"🚀 Optimization: Apply {content.get('description', 'proven pattern')} for better results",
|
|
"confidence": int(content.get("confidence", 0.8) * 100),
|
|
"source": "pattern-recognition",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"context": f"pattern-{current_phase}"
|
|
}
|
|
insights.append(insight)
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error generating insight from memory: {e}")
|
|
|
|
# Add some context-specific insights if none found
|
|
if not insights:
|
|
insights.extend([
|
|
{
|
|
"type": "warning",
|
|
"insight": "💡 Memory Insight: Consider validating memory sync functionality with sample data",
|
|
"confidence": 75,
|
|
"source": "system-intelligence",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"context": f"{current_persona}-{current_phase}"
|
|
},
|
|
{
|
|
"type": "optimization",
|
|
"insight": "🚀 Optimization: Memory-enhanced state provides better context continuity",
|
|
"confidence": 85,
|
|
"source": "system-analysis",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"context": f"optimization-{current_phase}"
|
|
}
|
|
])
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to generate enhanced insights: {e}")
|
|
|
|
return insights[:8] # Limit to top 8 insights
|
|
|
|
def _update_state_with_memory_intelligence(self, state_data: Dict[str, Any], insights: List[Dict[str, Any]]):
|
|
"""Update orchestrator state with memory intelligence."""
|
|
memory_state = state_data.get("memory_intelligence_state", {})
|
|
|
|
# Update proactive intelligence section
|
|
if "proactive_intelligence" not in memory_state:
|
|
memory_state["proactive_intelligence"] = {}
|
|
|
|
proactive = memory_state["proactive_intelligence"]
|
|
proactive["insights_generated"] = len(insights)
|
|
proactive["recommendations_active"] = len([i for i in insights if i["type"] == "optimization"])
|
|
proactive["warnings_issued"] = len([i for i in insights if i["type"] == "warning"])
|
|
proactive["optimization_opportunities"] = len([i for i in insights if "optimization" in i["type"]])
|
|
proactive["last_update"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Store insights in recent activity log
|
|
activity_log = state_data.get("recent_activity_log", {})
|
|
if "insight_generation" not in activity_log:
|
|
activity_log["insight_generation"] = []
|
|
|
|
# Add recent insights (keep last 10)
|
|
for insight in insights:
|
|
activity_entry = {
|
|
"timestamp": insight["timestamp"],
|
|
"insight_type": insight["type"],
|
|
"insight": insight["insight"],
|
|
"confidence": insight["confidence"],
|
|
"applied": False,
|
|
"effectiveness": 0
|
|
}
|
|
activity_log["insight_generation"].append(activity_entry)
|
|
|
|
# Keep only recent insights
|
|
activity_log["insight_generation"] = activity_log["insight_generation"][-10:]
|
|
|
|
def start_real_time_monitoring(self):
|
|
"""Start real-time memory synchronization monitoring."""
|
|
self.running = True
|
|
|
|
def monitor_loop():
|
|
logger.info(f"Starting real-time memory monitoring (interval: {self.sync_interval}s)")
|
|
|
|
while self.running:
|
|
try:
|
|
sync_results = self.sync_orchestrator_state_with_memory()
|
|
|
|
if sync_results["status"] == "success":
|
|
self.metrics.sync_success_rate = 0.9 # Update success rate
|
|
logger.debug(f"Memory sync completed: {len(sync_results['operations'])} operations")
|
|
else:
|
|
logger.warning(f"Memory sync failed: {sync_results['errors']}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Memory monitoring error: {e}")
|
|
self.metrics.errors_count += 1
|
|
|
|
time.sleep(self.sync_interval)
|
|
|
|
# Start monitoring in background thread
|
|
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
|
|
monitor_thread.start()
|
|
|
|
return monitor_thread
|
|
|
|
def stop_monitoring(self):
|
|
"""Stop real-time memory monitoring."""
|
|
self.running = False
|
|
logger.info("Memory monitoring stopped")
|
|
|
|
def diagnose_memory_integration(self) -> Dict[str, Any]:
|
|
"""Diagnose memory integration health and performance."""
|
|
diagnosis = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"memory_provider_status": self._check_memory_provider_status().value,
|
|
"metrics": {
|
|
"connection_latency": self.metrics.connection_latency,
|
|
"sync_success_rate": self.metrics.sync_success_rate,
|
|
"total_memories_created": self.metrics.total_memories_created,
|
|
"errors_count": self.metrics.errors_count,
|
|
"last_sync": self.metrics.last_sync_time.isoformat() if self.metrics.last_sync_time else None
|
|
},
|
|
"capabilities": {
|
|
"memory_available": self.memory_available,
|
|
"real_time_sync": self.sync_mode == SyncMode.REAL_TIME,
|
|
"pattern_recognition": len(self.patterns),
|
|
"proactive_insights": len(self.proactive_insights)
|
|
},
|
|
"recommendations": []
|
|
}
|
|
|
|
# Add recommendations based on diagnosis
|
|
if not self.memory_available:
|
|
diagnosis["recommendations"].append("Memory system not available - check OpenMemory MCP configuration")
|
|
|
|
if self.metrics.errors_count > 5:
|
|
diagnosis["recommendations"].append("High error count detected - review memory integration logs")
|
|
|
|
if self.metrics.connection_latency > 2.0:
|
|
diagnosis["recommendations"].append("High connection latency - consider optimizing memory queries")
|
|
|
|
return diagnosis
|
|
|
|
def main():
|
|
"""Main function for memory synchronization integration."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='BMAD Memory Synchronization Integration')
|
|
parser.add_argument('--sync-now', action='store_true',
|
|
help='Run memory synchronization immediately')
|
|
parser.add_argument('--monitor', action='store_true',
|
|
help='Start real-time monitoring mode')
|
|
parser.add_argument('--diagnose', action='store_true',
|
|
help='Run memory integration diagnostics')
|
|
parser.add_argument('--interval', type=int, default=30,
|
|
help='Sync interval in seconds (default: 30)')
|
|
parser.add_argument('--state-file', default='.ai/orchestrator-state.md',
|
|
help='Path to orchestrator state file')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize memory sync integration
|
|
memory_sync = MemorySyncIntegration(
|
|
state_file=args.state_file,
|
|
sync_interval=args.interval
|
|
)
|
|
|
|
# Check if memory functions are available
|
|
try:
|
|
# This would be replaced with actual OpenMemory MCP function imports
|
|
# For now, we'll simulate the availability check
|
|
print("🔍 Checking OpenMemory MCP availability...")
|
|
|
|
# Simulated memory function availability (replace with actual imports)
|
|
memory_available = False
|
|
try:
|
|
# from openmemory_mcp import add_memories, search_memory, list_memories
|
|
# memory_sync.initialize_memory_functions(add_memories, search_memory, list_memories)
|
|
# memory_available = True
|
|
pass
|
|
except ImportError:
|
|
print("⚠️ OpenMemory MCP not available - running in fallback mode")
|
|
memory_available = False
|
|
|
|
if args.diagnose:
|
|
print("\n🏥 Memory Integration Diagnostics")
|
|
diagnosis = memory_sync.diagnose_memory_integration()
|
|
print(f"Memory Provider Status: {diagnosis['memory_provider_status']}")
|
|
print(f"Memory Available: {diagnosis['capabilities']['memory_available']}")
|
|
print(f"Connection Latency: {diagnosis['metrics']['connection_latency']:.3f}s")
|
|
print(f"Total Errors: {diagnosis['metrics']['errors_count']}")
|
|
|
|
if diagnosis['recommendations']:
|
|
print("\nRecommendations:")
|
|
for rec in diagnosis['recommendations']:
|
|
print(f" • {rec}")
|
|
|
|
elif args.sync_now:
|
|
print("\n🔄 Running Memory Synchronization...")
|
|
sync_results = memory_sync.sync_orchestrator_state_with_memory()
|
|
|
|
print(f"Sync Status: {sync_results['status']}")
|
|
print(f"Operations: {len(sync_results['operations'])}")
|
|
print(f"Insights Generated: {sync_results['insights_generated']}")
|
|
print(f"Patterns Updated: {sync_results['patterns_updated']}")
|
|
|
|
if sync_results['errors']:
|
|
print(f"Errors: {sync_results['errors']}")
|
|
|
|
elif args.monitor:
|
|
print(f"\n👁️ Starting Real-Time Memory Monitoring (interval: {args.interval}s)")
|
|
print("Press Ctrl+C to stop monitoring")
|
|
|
|
try:
|
|
monitor_thread = memory_sync.start_real_time_monitoring()
|
|
|
|
# Keep main thread alive
|
|
while memory_sync.running:
|
|
time.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n⏹️ Stopping memory monitoring...")
|
|
memory_sync.stop_monitoring()
|
|
|
|
else:
|
|
print("✅ Memory Synchronization Integration Ready")
|
|
print("Use --sync-now, --monitor, or --diagnose to run operations")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Memory integration failed: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
main() |