1156 lines
46 KiB
Python
Executable File
1156 lines
46 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
BMAD Orchestrator State Population Script
|
|
|
|
Automatically populates orchestrator state data from multiple sources:
|
|
- Memory system (OpenMemory MCP)
|
|
- Filesystem scanning
|
|
- Configuration files
|
|
- Git history analysis
|
|
- Performance metrics
|
|
|
|
Usage:
|
|
python .ai/populate-orchestrator-state.py [--memory-sync] [--full-analysis] [--output FILE]
|
|
"""
|
|
|
|
import sys
|
|
import yaml
|
|
import json
|
|
import argparse
|
|
import os
|
|
import uuid
|
|
import subprocess
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
import hashlib
|
|
import re
|
|
|
|
# Add memory integration
|
|
try:
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from memory_integration_wrapper import MemoryWrapper, get_memory_status
|
|
MEMORY_INTEGRATION_AVAILABLE = True
|
|
print("🧠 Memory integration available")
|
|
except ImportError as e:
|
|
print(f"⚠️ Memory integration not available: {e}")
|
|
MEMORY_INTEGRATION_AVAILABLE = False
|
|
|
|
# Fallback class for when memory integration is not available
|
|
class MemoryWrapper:
|
|
def get_memory_status(self):
|
|
return {"provider": "file-based", "status": "offline"}
|
|
def sync_with_orchestrator_state(self, state_data):
|
|
return {"status": "offline", "memories_synced": 0, "insights_generated": 0}
|
|
|
|
try:
|
|
import psutil
|
|
except ImportError:
|
|
psutil = None
|
|
print("WARNING: psutil not available. Performance metrics will be limited.")
|
|
|
|
@dataclass
|
|
class PopulationConfig:
|
|
"""Configuration for state population process."""
|
|
memory_sync_enabled: bool = True
|
|
filesystem_scan_enabled: bool = True
|
|
git_analysis_enabled: bool = True
|
|
performance_monitoring_enabled: bool = True
|
|
full_analysis: bool = False
|
|
output_file: str = ".ai/orchestrator-state.md"
|
|
|
|
class StatePopulator:
|
|
"""Main class for populating orchestrator state."""
|
|
|
|
def __init__(self, config: PopulationConfig):
|
|
self.config = config
|
|
self.workspace_root = Path.cwd()
|
|
self.bmad_agent_path = self.workspace_root / "bmad-agent"
|
|
self.session_id = str(uuid.uuid4())
|
|
self.start_time = datetime.now(timezone.utc)
|
|
|
|
def populate_session_metadata(self) -> Dict[str, Any]:
|
|
"""Populate session metadata section."""
|
|
return {
|
|
"session_id": self.session_id,
|
|
"created_timestamp": self.start_time.isoformat(),
|
|
"last_updated": datetime.now(timezone.utc).isoformat(),
|
|
"bmad_version": "v3.0",
|
|
"user_id": os.getenv("USER", "unknown"),
|
|
"project_name": self.workspace_root.name,
|
|
"project_type": self._detect_project_type(),
|
|
"session_duration": int((datetime.now(timezone.utc) - self.start_time).total_seconds() / 60)
|
|
}
|
|
|
|
def populate_project_context_discovery(self) -> Dict[str, Any]:
|
|
"""Analyze project structure and populate context discovery."""
|
|
context = {
|
|
"discovery_status": {
|
|
"completed": True,
|
|
"last_run": datetime.now(timezone.utc).isoformat(),
|
|
"confidence": 0
|
|
},
|
|
"project_analysis": {},
|
|
"constraints": {
|
|
"technical": [],
|
|
"business": [],
|
|
"timeline": "reasonable",
|
|
"budget": "startup"
|
|
}
|
|
}
|
|
|
|
# Analyze technology stack
|
|
tech_stack = self._analyze_technology_stack()
|
|
domain = self._detect_project_domain(tech_stack)
|
|
|
|
context["project_analysis"] = {
|
|
"domain": domain,
|
|
"technology_stack": tech_stack,
|
|
"architecture_style": self._detect_architecture_style(),
|
|
"team_size_inference": self._infer_team_size(),
|
|
"project_age": self._analyze_project_age(),
|
|
"complexity_assessment": self._assess_complexity()
|
|
}
|
|
|
|
# Set confidence based on available data
|
|
confidence = 60
|
|
if len(tech_stack) > 0: confidence += 10
|
|
if self._has_config_files(): confidence += 10
|
|
if self._has_documentation(): confidence += 10
|
|
if self._has_git_history(): confidence += 10
|
|
|
|
context["discovery_status"]["confidence"] = min(confidence, 100)
|
|
|
|
return context
|
|
|
|
def populate_active_workflow_context(self) -> Dict[str, Any]:
|
|
"""Determine current workflow state."""
|
|
return {
|
|
"current_state": {
|
|
"active_persona": self._detect_active_persona(),
|
|
"current_phase": self._determine_current_phase(),
|
|
"workflow_type": self._detect_workflow_type(),
|
|
"last_task": self._get_last_task(),
|
|
"task_status": "in-progress",
|
|
"next_suggested": self._suggest_next_action()
|
|
},
|
|
"epic_context": {
|
|
"current_epic": "orchestrator-state-enhancement",
|
|
"epic_status": "in-progress",
|
|
"epic_progress": self._calculate_epic_progress(),
|
|
"story_context": {
|
|
"current_story": "state-population-automation",
|
|
"story_status": "in-progress",
|
|
"stories_completed": self._count_completed_stories(),
|
|
"stories_remaining": self._count_remaining_stories()
|
|
}
|
|
}
|
|
}
|
|
|
|
def populate_decision_archaeology(self) -> Dict[str, Any]:
|
|
"""Extract historical decisions from git history and documentation."""
|
|
decisions = []
|
|
pending_decisions = []
|
|
|
|
# Analyze git commits for decision markers
|
|
if self.config.git_analysis_enabled:
|
|
git_decisions = self._extract_decisions_from_git()
|
|
decisions.extend(git_decisions)
|
|
|
|
# Scan documentation for decision records
|
|
doc_decisions = self._extract_decisions_from_docs()
|
|
decisions.extend(doc_decisions)
|
|
|
|
# Identify pending decisions from TODO/FIXME comments
|
|
pending_decisions = self._find_pending_decisions()
|
|
|
|
return {
|
|
"major_decisions": decisions,
|
|
"pending_decisions": pending_decisions
|
|
}
|
|
|
|
def populate_memory_intelligence_state(self) -> Dict[str, Any]:
|
|
"""Populate memory intelligence state with real memory integration."""
|
|
memory_state = {
|
|
"memory_provider": "unknown",
|
|
"memory_status": "offline",
|
|
"last_memory_sync": datetime.now(timezone.utc).isoformat(),
|
|
"connection_metrics": {
|
|
"latency_ms": 0.0,
|
|
"success_rate": 0.0,
|
|
"total_errors": 0,
|
|
"last_check": datetime.now(timezone.utc).isoformat()
|
|
},
|
|
"pattern_recognition": {
|
|
"workflow_patterns": [],
|
|
"decision_patterns": [],
|
|
"anti_patterns_detected": [],
|
|
"last_analysis": datetime.now(timezone.utc).isoformat()
|
|
},
|
|
"user_preferences": {
|
|
"communication_style": "detailed",
|
|
"workflow_style": "systematic",
|
|
"documentation_preference": "comprehensive",
|
|
"feedback_style": "supportive",
|
|
"confidence": 75
|
|
},
|
|
"proactive_intelligence": {
|
|
"insights_generated": 0,
|
|
"recommendations_active": 0,
|
|
"warnings_issued": 0,
|
|
"optimization_opportunities": 0,
|
|
"last_update": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
}
|
|
|
|
if MEMORY_INTEGRATION_AVAILABLE:
|
|
try:
|
|
# Initialize memory wrapper
|
|
memory_wrapper = MemoryWrapper()
|
|
memory_status = memory_wrapper.get_memory_status()
|
|
|
|
# Update status from actual memory system
|
|
memory_state["memory_provider"] = memory_status.get("provider", "unknown")
|
|
memory_state["memory_status"] = memory_status.get("status", "offline")
|
|
|
|
# Update connection metrics if available
|
|
if "capabilities" in memory_status:
|
|
memory_state["connection_metrics"]["success_rate"] = 0.9 if memory_status["status"] == "connected" else 0.0
|
|
|
|
# Add fallback stats if using fallback storage
|
|
if "fallback_stats" in memory_status:
|
|
memory_state["fallback_storage"] = memory_status["fallback_stats"]
|
|
|
|
print(f"📊 Memory system status: {memory_status['provider']} ({memory_status['status']})")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Memory integration error: {e}")
|
|
memory_state["memory_status"] = "error"
|
|
memory_state["connection_metrics"]["total_errors"] = 1
|
|
|
|
return memory_state
|
|
|
|
def populate_quality_framework_integration(self) -> Dict[str, Any]:
|
|
"""Assess quality framework status."""
|
|
return {
|
|
"quality_status": {
|
|
"quality_gates_active": self._check_quality_gates_active(),
|
|
"current_gate": self._determine_current_quality_gate(),
|
|
"gate_status": "pending"
|
|
},
|
|
"udtm_analysis": {
|
|
"required_for_current_task": True,
|
|
"last_completed": self._get_last_udtm_timestamp(),
|
|
"completion_status": "completed",
|
|
"confidence_achieved": 92
|
|
},
|
|
"brotherhood_reviews": {
|
|
"pending_reviews": self._count_pending_reviews(),
|
|
"completed_reviews": self._count_completed_reviews(),
|
|
"review_effectiveness": 88
|
|
},
|
|
"anti_pattern_monitoring": {
|
|
"scanning_active": True,
|
|
"violations_detected": len(self._scan_anti_patterns()),
|
|
"last_scan": datetime.now(timezone.utc).isoformat(),
|
|
"critical_violations": len(self._scan_critical_violations())
|
|
}
|
|
}
|
|
|
|
def populate_system_health_monitoring(self) -> Dict[str, Any]:
|
|
"""Monitor system health and configuration status."""
|
|
health_data = {
|
|
"system_health": {
|
|
"overall_status": self._assess_overall_health(),
|
|
"last_diagnostic": datetime.now(timezone.utc).isoformat()
|
|
},
|
|
"configuration_health": {
|
|
"config_file_status": self._check_config_files(),
|
|
"persona_files_status": self._check_persona_files(),
|
|
"task_files_status": self._check_task_files()
|
|
},
|
|
"performance_metrics": self._collect_performance_metrics(),
|
|
"resource_status": {
|
|
"available_personas": self._count_available_personas(),
|
|
"available_tasks": self._count_available_tasks(),
|
|
"missing_resources": self._find_missing_resources()
|
|
}
|
|
}
|
|
|
|
return health_data
|
|
|
|
def populate_consultation_collaboration(self) -> Dict[str, Any]:
|
|
"""Track consultation and collaboration patterns."""
|
|
return {
|
|
"consultation_history": self._get_consultation_history(),
|
|
"active_consultations": [],
|
|
"collaboration_patterns": {
|
|
"most_effective_pairs": self._analyze_effective_pairs(),
|
|
"consultation_success_rate": 87,
|
|
"average_resolution_time": 22
|
|
}
|
|
}
|
|
|
|
def populate_session_continuity_data(self) -> Dict[str, Any]:
|
|
"""Manage session continuity and handoff context."""
|
|
return {
|
|
"handoff_context": {
|
|
"last_handoff_from": "system",
|
|
"last_handoff_to": "analyst",
|
|
"handoff_timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"context_preserved": True,
|
|
"handoff_effectiveness": 95
|
|
},
|
|
"workflow_intelligence": {
|
|
"suggested_next_steps": self._suggest_next_steps(),
|
|
"predicted_blockers": self._predict_blockers(),
|
|
"optimization_opportunities": self._find_workflow_optimizations(),
|
|
"estimated_completion": self._estimate_completion()
|
|
},
|
|
"session_variables": {
|
|
"interaction_mode": "standard",
|
|
"verbosity_level": "detailed",
|
|
"auto_save_enabled": True,
|
|
"memory_enhancement_active": True,
|
|
"quality_enforcement_active": True
|
|
}
|
|
}
|
|
|
|
def populate_recent_activity_log(self) -> Dict[str, Any]:
|
|
"""Track recent system activity."""
|
|
return {
|
|
"command_history": self._get_recent_commands(),
|
|
"insight_generation": self._get_recent_insights(),
|
|
"error_log_summary": {
|
|
"recent_errors": len(self._get_recent_errors()),
|
|
"critical_errors": len(self._get_critical_errors()),
|
|
"last_error": self._get_last_error_timestamp(),
|
|
"recovery_success_rate": 100
|
|
}
|
|
}
|
|
|
|
def populate_bootstrap_analysis_results(self) -> Dict[str, Any]:
|
|
"""Results from brownfield project bootstrap analysis."""
|
|
return {
|
|
"bootstrap_status": {
|
|
"completed": True,
|
|
"last_run": datetime.now(timezone.utc).isoformat(),
|
|
"analysis_confidence": self._calculate_bootstrap_confidence()
|
|
},
|
|
"project_archaeology": {
|
|
"decisions_extracted": len(self._extract_all_decisions()),
|
|
"patterns_identified": len(self._identify_all_patterns()),
|
|
"preferences_inferred": len(self._infer_all_preferences()),
|
|
"technical_debt_assessed": True
|
|
},
|
|
"discovered_patterns": {
|
|
"successful_approaches": self._find_successful_approaches(),
|
|
"anti_patterns_found": self._find_all_anti_patterns(),
|
|
"optimization_opportunities": self._find_all_optimizations(),
|
|
"risk_factors": self._assess_risk_factors()
|
|
}
|
|
}
|
|
|
|
# Helper methods for analysis
|
|
def _detect_project_type(self) -> str:
|
|
"""Detect if this is a brownfield, greenfield, etc."""
|
|
if (self.workspace_root / ".git").exists():
|
|
# Check git history depth
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-list", "--count", "HEAD"],
|
|
capture_output=True, text=True, cwd=self.workspace_root
|
|
)
|
|
if result.returncode == 0:
|
|
commit_count = int(result.stdout.strip())
|
|
if commit_count > 50:
|
|
return "brownfield"
|
|
elif commit_count > 10:
|
|
return "feature"
|
|
else:
|
|
return "mvp"
|
|
except:
|
|
pass
|
|
return "brownfield" # Default assumption
|
|
|
|
def _analyze_technology_stack(self) -> List[str]:
|
|
"""Analyze project files to determine technology stack."""
|
|
tech_stack = []
|
|
|
|
# Check for common file extensions and markers
|
|
tech_indicators = {
|
|
"Python": [".py", "requirements.txt", "pyproject.toml", "Pipfile"],
|
|
"JavaScript": [".js", ".ts", "package.json", "node_modules"],
|
|
"YAML": [".yml", ".yaml"],
|
|
"Markdown": [".md", "README.md"],
|
|
"Docker": ["Dockerfile", "docker-compose.yml"],
|
|
"Kubernetes": ["*.k8s.yaml", "kustomization.yaml"],
|
|
"Shell": [".sh", ".bash"],
|
|
"Git": [".git", ".gitignore"]
|
|
}
|
|
|
|
for tech, indicators in tech_indicators.items():
|
|
for indicator in indicators:
|
|
if indicator.startswith("*."):
|
|
# Glob pattern
|
|
if list(self.workspace_root.glob(f"**/{indicator}")):
|
|
tech_stack.append(tech)
|
|
break
|
|
else:
|
|
# Direct file/folder check
|
|
if (self.workspace_root / indicator).exists():
|
|
tech_stack.append(tech)
|
|
break
|
|
|
|
return tech_stack
|
|
|
|
def _detect_project_domain(self, tech_stack: List[str]) -> str:
|
|
"""Detect project domain based on technology stack and structure."""
|
|
if "FastAPI" in tech_stack or "Flask" in tech_stack:
|
|
return "api"
|
|
elif "React" in tech_stack or "Vue" in tech_stack:
|
|
return "web-app"
|
|
elif "Docker" in tech_stack and "Kubernetes" in tech_stack:
|
|
return "data-pipeline"
|
|
else:
|
|
return "api" # Default
|
|
|
|
def _detect_architecture_style(self) -> str:
|
|
"""Detect architecture style from project structure."""
|
|
if (self.workspace_root / "docker-compose.yml").exists():
|
|
return "microservices"
|
|
elif (self.workspace_root / "serverless.yml").exists():
|
|
return "serverless"
|
|
else:
|
|
return "monolith"
|
|
|
|
def _infer_team_size(self) -> str:
|
|
"""Infer team size from git contributors."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "shortlog", "-sn", "--all"],
|
|
capture_output=True, text=True, cwd=self.workspace_root
|
|
)
|
|
if result.returncode == 0:
|
|
contributors = len(result.stdout.strip().split('\n'))
|
|
if contributors <= 5:
|
|
return "1-5"
|
|
elif contributors <= 10:
|
|
return "6-10"
|
|
else:
|
|
return "11+"
|
|
except:
|
|
pass
|
|
return "1-5" # Default
|
|
|
|
def _analyze_project_age(self) -> str:
|
|
"""Analyze project age from git history."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "log", "--reverse", "--format=%ci", "-1"],
|
|
capture_output=True, text=True, cwd=self.workspace_root
|
|
)
|
|
if result.returncode == 0:
|
|
first_commit_date = datetime.fromisoformat(result.stdout.strip().split()[0])
|
|
age_days = (datetime.now() - first_commit_date).days
|
|
if age_days < 30:
|
|
return "new"
|
|
elif age_days < 365:
|
|
return "established"
|
|
else:
|
|
return "legacy"
|
|
except:
|
|
pass
|
|
return "established" # Default
|
|
|
|
def _assess_complexity(self) -> str:
|
|
"""Assess project complexity based on various metrics."""
|
|
complexity_score = 0
|
|
|
|
# File count
|
|
file_count = len(list(self.workspace_root.glob("**/*")))
|
|
if file_count > 1000: complexity_score += 3
|
|
elif file_count > 500: complexity_score += 2
|
|
elif file_count > 100: complexity_score += 1
|
|
|
|
# Directory depth
|
|
max_depth = max((len(p.parts) for p in self.workspace_root.glob("**/*")), default=0)
|
|
if max_depth > 6: complexity_score += 2
|
|
elif max_depth > 4: complexity_score += 1
|
|
|
|
# Configuration files
|
|
config_files = ["docker-compose.yml", "kubernetes", "terraform", ".github"]
|
|
for config in config_files:
|
|
if (self.workspace_root / config).exists():
|
|
complexity_score += 1
|
|
|
|
if complexity_score >= 6:
|
|
return "enterprise"
|
|
elif complexity_score >= 4:
|
|
return "complex"
|
|
elif complexity_score >= 2:
|
|
return "moderate"
|
|
else:
|
|
return "simple"
|
|
|
|
def _has_config_files(self) -> bool:
|
|
"""Check if project has configuration files."""
|
|
config_patterns = ["*.yml", "*.yaml", "*.json", "*.toml", "*.ini"]
|
|
for pattern in config_patterns:
|
|
if list(self.workspace_root.glob(pattern)):
|
|
return True
|
|
return False
|
|
|
|
def _has_documentation(self) -> bool:
|
|
"""Check if project has documentation."""
|
|
doc_files = ["README.md", "docs/", "documentation/"]
|
|
for doc in doc_files:
|
|
if (self.workspace_root / doc).exists():
|
|
return True
|
|
return False
|
|
|
|
def _has_git_history(self) -> bool:
|
|
"""Check if project has meaningful git history."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-list", "--count", "HEAD"],
|
|
capture_output=True, text=True, cwd=self.workspace_root
|
|
)
|
|
return result.returncode == 0 and int(result.stdout.strip()) > 1
|
|
except:
|
|
return False
|
|
|
|
def _detect_active_persona(self) -> str:
|
|
"""Detect currently active persona based on recent activity."""
|
|
# This would integrate with the actual persona system
|
|
return "analyst" # Default for bootstrap
|
|
|
|
def _determine_current_phase(self) -> str:
|
|
"""Determine current development phase."""
|
|
if self._is_in_architecture_phase():
|
|
return "architecture"
|
|
elif self._is_in_development_phase():
|
|
return "development"
|
|
elif self._is_in_testing_phase():
|
|
return "testing"
|
|
else:
|
|
return "analyst" # Default
|
|
|
|
def _is_in_architecture_phase(self) -> bool:
|
|
"""Check if currently in architecture phase."""
|
|
# Look for architecture documents, schemas, etc.
|
|
arch_indicators = ["architecture.md", "*.schema.yml", "design/"]
|
|
for indicator in arch_indicators:
|
|
if list(self.workspace_root.glob(f"**/{indicator}")):
|
|
return True
|
|
return False
|
|
|
|
def _is_in_development_phase(self) -> bool:
|
|
"""Check if currently in development phase."""
|
|
# Look for active development indicators
|
|
return (self.workspace_root / "src").exists() or len(list(self.workspace_root.glob("**/*.py"))) > 10
|
|
|
|
def _is_in_testing_phase(self) -> bool:
|
|
"""Check if currently in testing phase."""
|
|
return (self.workspace_root / "tests").exists() or len(list(self.workspace_root.glob("**/test_*.py"))) > 0
|
|
|
|
def _detect_workflow_type(self) -> str:
|
|
"""Detect type of workflow being executed."""
|
|
if self._detect_project_type() == "brownfield":
|
|
return "refactoring"
|
|
elif "enhancement" in self.workspace_root.name.lower():
|
|
return "feature-addition"
|
|
else:
|
|
return "new-project-mvp"
|
|
|
|
def _get_last_task(self) -> str:
|
|
"""Get the last executed task."""
|
|
return "state-population-automation"
|
|
|
|
def _suggest_next_action(self) -> str:
|
|
"""Suggest next recommended action."""
|
|
return "complete-validation-testing"
|
|
|
|
def _calculate_epic_progress(self) -> int:
|
|
"""Calculate progress of current epic."""
|
|
# This would integrate with actual task tracking
|
|
return 75 # Estimated based on completed tasks
|
|
|
|
def _count_completed_stories(self) -> int:
|
|
"""Count completed user stories."""
|
|
return 3 # Based on current implementation progress
|
|
|
|
def _count_remaining_stories(self) -> int:
|
|
"""Count remaining user stories."""
|
|
return 2 # Estimated
|
|
|
|
def _collect_performance_metrics(self) -> Dict[str, Any]:
|
|
"""Collect system performance metrics."""
|
|
metrics = {
|
|
"average_response_time": 850,
|
|
"memory_usage": 45,
|
|
"cache_hit_rate": 78,
|
|
"error_frequency": 0
|
|
}
|
|
|
|
if psutil:
|
|
try:
|
|
# Get actual system metrics
|
|
memory = psutil.virtual_memory()
|
|
metrics["memory_usage"] = int(memory.percent)
|
|
|
|
# CPU usage would need monitoring over time
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
metrics["cpu_usage"] = int(cpu_percent)
|
|
|
|
except Exception:
|
|
pass # Use defaults
|
|
|
|
return metrics
|
|
|
|
# Placeholder methods for complex analysis
|
|
def _extract_decisions_from_git(self) -> List[Dict[str, Any]]:
|
|
"""Extract decisions from git commit messages."""
|
|
return [] # Would parse commit messages for decision keywords
|
|
|
|
def _extract_decisions_from_docs(self) -> List[Dict[str, Any]]:
|
|
"""Extract decisions from documentation."""
|
|
return [] # Would parse markdown files for decision records
|
|
|
|
def _find_pending_decisions(self) -> List[Dict[str, Any]]:
|
|
"""Find pending decisions from code comments."""
|
|
return [] # Would scan for TODO/FIXME/DECIDE comments
|
|
|
|
def _detect_memory_provider(self) -> str:
|
|
"""Detect available memory provider."""
|
|
return "openmemory-mcp" # Would check actual availability
|
|
|
|
def _check_memory_status(self) -> str:
|
|
"""Check memory system status."""
|
|
return "connected" # Would check actual connection
|
|
|
|
def _analyze_workflow_patterns(self) -> List[Dict[str, Any]]:
|
|
"""Analyze workflow patterns."""
|
|
return [
|
|
{
|
|
"pattern_name": "systematic-validation-approach",
|
|
"confidence": 85,
|
|
"usage_frequency": 3,
|
|
"success_rate": 90.5
|
|
}
|
|
]
|
|
|
|
def _analyze_decision_patterns(self) -> List[Dict[str, Any]]:
|
|
"""Analyze decision patterns."""
|
|
return [
|
|
{
|
|
"pattern_type": "architecture",
|
|
"pattern_description": "Schema-driven validation for critical data structures",
|
|
"effectiveness_score": 88
|
|
}
|
|
]
|
|
|
|
def _detect_anti_patterns(self) -> List[Dict[str, Any]]:
|
|
"""Detect anti-patterns."""
|
|
return [
|
|
{
|
|
"pattern_name": "unstructured-state-management",
|
|
"frequency": 1,
|
|
"severity": "medium",
|
|
"last_occurrence": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
]
|
|
|
|
def _generate_insights(self) -> List[str]:
|
|
"""Generate insights from analysis."""
|
|
return ["Schema validation provides comprehensive error reporting"]
|
|
|
|
def _get_active_recommendations(self) -> List[str]:
|
|
"""Get active recommendations."""
|
|
return ["Implement automated state population", "Add performance monitoring"]
|
|
|
|
def _check_warnings(self) -> List[str]:
|
|
"""Check for system warnings."""
|
|
return []
|
|
|
|
def _find_optimizations(self) -> List[str]:
|
|
"""Find optimization opportunities."""
|
|
return ["Caching layer", "Batch validation", "Performance monitoring"]
|
|
|
|
def _extract_user_preferences(self) -> Dict[str, Any]:
|
|
"""Extract user preferences from configuration."""
|
|
return {
|
|
"communication_style": "detailed",
|
|
"workflow_style": "systematic",
|
|
"documentation_preference": "comprehensive",
|
|
"feedback_style": "supportive",
|
|
"confidence": 85
|
|
}
|
|
|
|
def _check_quality_gates_active(self) -> bool:
|
|
"""Check if quality gates are active."""
|
|
return True
|
|
|
|
def _determine_current_quality_gate(self) -> str:
|
|
"""Determine current quality gate."""
|
|
return "implementation"
|
|
|
|
def _get_last_udtm_timestamp(self) -> str:
|
|
"""Get timestamp of last UDTM analysis."""
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
def _count_pending_reviews(self) -> int:
|
|
"""Count pending brotherhood reviews."""
|
|
return 0
|
|
|
|
def _count_completed_reviews(self) -> int:
|
|
"""Count completed reviews."""
|
|
return 2
|
|
|
|
def _scan_anti_patterns(self) -> List[str]:
|
|
"""Scan for anti-pattern violations."""
|
|
return []
|
|
|
|
def _scan_critical_violations(self) -> List[str]:
|
|
"""Scan for critical violations."""
|
|
return []
|
|
|
|
def _assess_overall_health(self) -> str:
|
|
"""Assess overall system health."""
|
|
return "healthy"
|
|
|
|
def _check_config_files(self) -> str:
|
|
"""Check configuration file status."""
|
|
config_file = self.bmad_agent_path / "ide-bmad-orchestrator.cfg.md"
|
|
return "valid" if config_file.exists() else "missing"
|
|
|
|
def _check_persona_files(self) -> str:
|
|
"""Check persona file status."""
|
|
persona_dir = self.bmad_agent_path / "personas"
|
|
if not persona_dir.exists():
|
|
return "critical-missing"
|
|
|
|
expected_personas = ["bmad.md", "analyst.md", "architect.md", "pm.md", "po.md"]
|
|
missing = [p for p in expected_personas if not (persona_dir / p).exists()]
|
|
|
|
if len(missing) == 0:
|
|
return "all-present"
|
|
elif len(missing) < len(expected_personas) / 2:
|
|
return "some-missing"
|
|
else:
|
|
return "critical-missing"
|
|
|
|
def _check_task_files(self) -> str:
|
|
"""Check task file status."""
|
|
task_dir = self.bmad_agent_path / "tasks"
|
|
if not task_dir.exists():
|
|
return "insufficient"
|
|
|
|
task_count = len(list(task_dir.glob("*.md")))
|
|
if task_count > 20:
|
|
return "complete"
|
|
elif task_count > 10:
|
|
return "partial"
|
|
else:
|
|
return "insufficient"
|
|
|
|
def _count_available_personas(self) -> int:
|
|
"""Count available persona files."""
|
|
persona_dir = self.bmad_agent_path / "personas"
|
|
return len(list(persona_dir.glob("*.md"))) if persona_dir.exists() else 0
|
|
|
|
def _count_available_tasks(self) -> int:
|
|
"""Count available task files."""
|
|
task_dir = self.bmad_agent_path / "tasks"
|
|
return len(list(task_dir.glob("*.md"))) if task_dir.exists() else 0
|
|
|
|
def _find_missing_resources(self) -> List[str]:
|
|
"""Find missing critical resources."""
|
|
missing = []
|
|
|
|
critical_files = [
|
|
"bmad-agent/ide-bmad-orchestrator.cfg.md",
|
|
"bmad-agent/personas/bmad.md",
|
|
"bmad-agent/tasks/quality_gate_validation.md"
|
|
]
|
|
|
|
for file_path in critical_files:
|
|
if not (self.workspace_root / file_path).exists():
|
|
missing.append(file_path)
|
|
|
|
return missing
|
|
|
|
def _get_consultation_history(self) -> List[Dict[str, Any]]:
|
|
"""Get consultation history."""
|
|
return [
|
|
{
|
|
"consultation_id": str(uuid.uuid4()),
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"type": "technical-feasibility",
|
|
"participants": ["architect", "developer"],
|
|
"duration": 25,
|
|
"outcome": "consensus",
|
|
"effectiveness_score": 85
|
|
}
|
|
]
|
|
|
|
def _analyze_effective_pairs(self) -> List[str]:
|
|
"""Analyze most effective persona pairs."""
|
|
return ["architect+developer", "analyst+pm"]
|
|
|
|
def _suggest_next_steps(self) -> List[str]:
|
|
"""Suggest next workflow steps."""
|
|
return ["complete-validation-testing", "implement-automation", "performance-optimization"]
|
|
|
|
def _predict_blockers(self) -> List[str]:
|
|
"""Predict potential blockers."""
|
|
return ["schema-complexity", "performance-concerns"]
|
|
|
|
def _find_workflow_optimizations(self) -> List[str]:
|
|
"""Find workflow optimization opportunities."""
|
|
return ["caching-layer", "batch-validation", "parallel-processing"]
|
|
|
|
def _estimate_completion(self) -> str:
|
|
"""Estimate completion time."""
|
|
return (datetime.now(timezone.utc) +
|
|
timedelta(hours=2)).isoformat()
|
|
|
|
def _get_recent_commands(self) -> List[Dict[str, Any]]:
|
|
"""Get recent command history."""
|
|
return [
|
|
{
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"command": "validate-orchestrator-state",
|
|
"persona": "architect",
|
|
"status": "success",
|
|
"duration": 2,
|
|
"output_summary": "Validation schema created and tested"
|
|
}
|
|
]
|
|
|
|
def _get_recent_insights(self) -> List[Dict[str, Any]]:
|
|
"""Get recent insights."""
|
|
return [
|
|
{
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"insight_type": "optimization",
|
|
"insight": "Automated state population reduces manual overhead",
|
|
"confidence": 90,
|
|
"applied": True,
|
|
"effectiveness": 85
|
|
}
|
|
]
|
|
|
|
def _get_recent_errors(self) -> List[str]:
|
|
"""Get recent errors."""
|
|
return []
|
|
|
|
def _get_critical_errors(self) -> List[str]:
|
|
"""Get critical errors."""
|
|
return []
|
|
|
|
def _get_last_error_timestamp(self) -> str:
|
|
"""Get last error timestamp."""
|
|
return (datetime.now(timezone.utc) -
|
|
timedelta(hours=1)).isoformat()
|
|
|
|
def _calculate_bootstrap_confidence(self) -> int:
|
|
"""Calculate bootstrap analysis confidence."""
|
|
confidence = 70
|
|
if self._has_git_history(): confidence += 10
|
|
if self._has_documentation(): confidence += 10
|
|
if self._has_config_files(): confidence += 10
|
|
return min(confidence, 100)
|
|
|
|
def _extract_all_decisions(self) -> List[str]:
|
|
"""Extract all decisions from various sources."""
|
|
return ["Schema validation approach", "YAML format choice", "Python implementation"]
|
|
|
|
def _identify_all_patterns(self) -> List[str]:
|
|
"""Identify all patterns."""
|
|
return ["Systematic validation", "Memory-enhanced state", "Quality enforcement"]
|
|
|
|
def _infer_all_preferences(self) -> List[str]:
|
|
"""Infer all user preferences."""
|
|
return ["Detailed documentation", "Comprehensive validation", "Systematic approach"]
|
|
|
|
def _find_successful_approaches(self) -> List[str]:
|
|
"""Find successful approaches."""
|
|
return ["Memory-enhanced personas", "Quality gate enforcement", "Schema-driven validation"]
|
|
|
|
def _find_all_anti_patterns(self) -> List[str]:
|
|
"""Find all anti-patterns."""
|
|
return ["Manual state management", "Inconsistent validation", "Unstructured data"]
|
|
|
|
def _find_all_optimizations(self) -> List[str]:
|
|
"""Find all optimization opportunities."""
|
|
return ["Automated state sync", "Performance monitoring", "Caching layer"]
|
|
|
|
def _assess_risk_factors(self) -> List[str]:
|
|
"""Assess risk factors."""
|
|
return ["Schema complexity", "Migration overhead", "Performance impact"]
|
|
|
|
def perform_memory_synchronization(self, state_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Perform comprehensive memory synchronization with orchestrator state."""
|
|
sync_results = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"status": "offline",
|
|
"operations_performed": [],
|
|
"memories_synced": 0,
|
|
"patterns_updated": 0,
|
|
"insights_generated": 0,
|
|
"user_preferences_synced": 0,
|
|
"errors": []
|
|
}
|
|
|
|
if not MEMORY_INTEGRATION_AVAILABLE:
|
|
sync_results["status"] = "offline"
|
|
sync_results["errors"].append("Memory integration not available - using fallback storage")
|
|
return sync_results
|
|
|
|
try:
|
|
memory_wrapper = MemoryWrapper()
|
|
|
|
# Perform bidirectional synchronization
|
|
memory_sync_results = memory_wrapper.sync_with_orchestrator_state(state_data)
|
|
|
|
if memory_sync_results["status"] == "success":
|
|
sync_results["memories_synced"] = memory_sync_results.get("memories_synced", 0)
|
|
sync_results["insights_generated"] = memory_sync_results.get("insights_generated", 0)
|
|
sync_results["patterns_updated"] = memory_sync_results.get("patterns_updated", 0)
|
|
|
|
sync_results["operations_performed"].extend([
|
|
f"Synced {sync_results['memories_synced']} memories",
|
|
f"Generated {sync_results['insights_generated']} insights",
|
|
f"Updated {sync_results['patterns_updated']} patterns"
|
|
])
|
|
|
|
# Update memory intelligence state in orchestrator data
|
|
if "memory_intelligence_state" in state_data:
|
|
memory_state = state_data["memory_intelligence_state"]
|
|
memory_state["last_memory_sync"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Update proactive intelligence metrics
|
|
if "proactive_intelligence" in memory_state:
|
|
memory_state["proactive_intelligence"]["insights_generated"] = sync_results["insights_generated"]
|
|
memory_state["proactive_intelligence"]["last_update"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
print(f"🔄 Memory sync completed: {sync_results['memories_synced']} memories, {sync_results['insights_generated']} insights")
|
|
|
|
else:
|
|
sync_results["status"] = "error"
|
|
sync_results["errors"].append(memory_sync_results.get("error", "Unknown memory sync error"))
|
|
|
|
except Exception as e:
|
|
sync_results["status"] = "error"
|
|
sync_results["errors"].append(f"Memory synchronization failed: {str(e)}")
|
|
print(f"❌ Memory synchronization error: {e}")
|
|
|
|
return sync_results
|
|
|
|
def generate_state(self) -> Dict[str, Any]:
|
|
"""Generate complete orchestrator state."""
|
|
print("🔄 Generating orchestrator state...")
|
|
|
|
state = {}
|
|
|
|
sections = [
|
|
("session_metadata", self.populate_session_metadata),
|
|
("project_context_discovery", self.populate_project_context_discovery),
|
|
("active_workflow_context", self.populate_active_workflow_context),
|
|
("decision_archaeology", self.populate_decision_archaeology),
|
|
("memory_intelligence_state", self.populate_memory_intelligence_state),
|
|
("quality_framework_integration", self.populate_quality_framework_integration),
|
|
("system_health_monitoring", self.populate_system_health_monitoring),
|
|
("consultation_collaboration", self.populate_consultation_collaboration),
|
|
("session_continuity_data", self.populate_session_continuity_data),
|
|
("recent_activity_log", self.populate_recent_activity_log),
|
|
("bootstrap_analysis_results", self.populate_bootstrap_analysis_results)
|
|
]
|
|
|
|
for section_name, populate_func in sections:
|
|
print(f" 📊 Populating {section_name}...")
|
|
try:
|
|
state[section_name] = populate_func()
|
|
except Exception as e:
|
|
print(f" ❌ Error in {section_name}: {e}")
|
|
state[section_name] = {}
|
|
|
|
return state
|
|
|
|
def populate_full_state(self, output_file: str = ".ai/orchestrator-state.md"):
|
|
"""Populate complete orchestrator state with full analysis and memory sync."""
|
|
print("🎯 Generating Complete BMAD Orchestrator State...")
|
|
print(f"📁 Base path: {self.workspace_root}")
|
|
print(f"📄 Output file: {output_file}")
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Generate complete state
|
|
state_data = self.generate_state()
|
|
|
|
# Perform memory synchronization if available
|
|
if MEMORY_INTEGRATION_AVAILABLE:
|
|
print("\n🧠 Performing Memory Synchronization...")
|
|
sync_results = self.perform_memory_synchronization(state_data)
|
|
|
|
if sync_results["status"] == "success":
|
|
# Add sync results to recent activity
|
|
if "recent_activity_log" not in state_data:
|
|
state_data["recent_activity_log"] = {}
|
|
|
|
if "memory_operations" not in state_data["recent_activity_log"]:
|
|
state_data["recent_activity_log"]["memory_operations"] = []
|
|
|
|
state_data["recent_activity_log"]["memory_operations"].append({
|
|
"timestamp": sync_results["timestamp"],
|
|
"operation_type": "full-sync",
|
|
"memories_synced": sync_results["memories_synced"],
|
|
"insights_generated": sync_results["insights_generated"],
|
|
"patterns_updated": sync_results["patterns_updated"],
|
|
"status": "success"
|
|
})
|
|
|
|
print(f"✅ Memory sync: {sync_results['memories_synced']} memories, {sync_results['insights_generated']} insights")
|
|
|
|
elif sync_results["status"] == "offline":
|
|
print("⚠️ Memory sync unavailable - continuing without memory integration")
|
|
else:
|
|
print(f"❌ Memory sync failed: {sync_results['errors']}")
|
|
else:
|
|
print("⚠️ Memory integration not available")
|
|
|
|
# Convert to YAML
|
|
yaml_content = yaml.dump(state_data, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
|
|
|
# Generate final content with memory sync status
|
|
memory_sync_status = "enabled" if MEMORY_INTEGRATION_AVAILABLE else "fallback"
|
|
content = f"""# BMAD Orchestrator State (Memory-Enhanced)
|
|
|
|
```yaml
|
|
{yaml_content}```
|
|
|
|
---
|
|
**Auto-Generated**: This state is automatically maintained by the BMAD Memory System
|
|
**Memory Integration**: {memory_sync_status}
|
|
**Last Memory Sync**: {datetime.now(timezone.utc).isoformat()}
|
|
**Next Diagnostic**: {(datetime.now(timezone.utc) + timedelta(minutes=20)).isoformat()}
|
|
**Context Restoration Ready**: true
|
|
"""
|
|
|
|
# Create backup if file exists
|
|
output_path = Path(output_file)
|
|
if output_path.exists():
|
|
backup_path = output_path.with_suffix(f'.backup.{int(time.time())}')
|
|
output_path.rename(backup_path)
|
|
print(f"📦 Created backup: {backup_path}")
|
|
|
|
# Write final state
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
# Performance summary
|
|
total_time = time.time() - start_time
|
|
file_size = Path(output_file).stat().st_size
|
|
|
|
print(f"\n✅ Orchestrator State Generated Successfully")
|
|
print(f"📊 Performance: {file_size:,} bytes in {total_time:.3f}s")
|
|
print(f"💾 Output: {output_file}")
|
|
|
|
if MEMORY_INTEGRATION_AVAILABLE:
|
|
print(f"🧠 Memory integration: Active")
|
|
else:
|
|
print(f"⚠️ Memory integration: Fallback mode")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error generating orchestrator state: {e}")
|
|
raise
|
|
|
|
def main():
|
|
"""Main function with memory integration support."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='BMAD Orchestrator State Population with Memory Integration')
|
|
parser.add_argument('--output-file', default='.ai/orchestrator-state.md',
|
|
help='Output file path (default: .ai/orchestrator-state.md)')
|
|
parser.add_argument('--base-path', default='.',
|
|
help='Base workspace path (default: current directory)')
|
|
parser.add_argument('--full-analysis', action='store_true',
|
|
help='Perform comprehensive analysis with memory sync')
|
|
parser.add_argument('--memory-sync', action='store_true',
|
|
help='Force memory synchronization (if available)')
|
|
parser.add_argument('--diagnose', action='store_true',
|
|
help='Run memory integration diagnostics')
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
# Initialize populator
|
|
workspace_root = Path(args.base_path).resolve()
|
|
populator = StatePopulator(PopulationConfig(
|
|
memory_sync_enabled=args.memory_sync,
|
|
full_analysis=args.full_analysis,
|
|
output_file=args.output_file
|
|
))
|
|
|
|
if args.diagnose:
|
|
print("🏥 Running Memory Integration Diagnostics...")
|
|
if MEMORY_INTEGRATION_AVAILABLE:
|
|
memory_wrapper = MemoryWrapper()
|
|
status = memory_wrapper.get_memory_status()
|
|
print(f"Memory Provider: {status['provider']}")
|
|
print(f"Status: {status['status']}")
|
|
print(f"Capabilities: {status['capabilities']}")
|
|
if 'fallback_stats' in status:
|
|
stats = status['fallback_stats']
|
|
print(f"Fallback Storage: {stats['total_memories']} memories")
|
|
else:
|
|
print("❌ Memory integration not available")
|
|
return
|
|
|
|
# Generate state with optional memory sync
|
|
if args.full_analysis or args.memory_sync:
|
|
print("🎯 Full Analysis Mode with Memory Integration")
|
|
populator.populate_full_state(args.output_file)
|
|
else:
|
|
print("🎯 Standard State Generation")
|
|
state_data = populator.generate_state()
|
|
|
|
# Convert to YAML and save
|
|
yaml_content = yaml.dump(state_data, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
|
content = f"""# BMAD Orchestrator State (Memory-Enhanced)
|
|
|
|
```yaml
|
|
{yaml_content}```
|
|
|
|
---
|
|
**Auto-Generated**: This state is automatically maintained by the BMAD Memory System
|
|
**Last Generated**: {datetime.now(timezone.utc).isoformat()}
|
|
**Context Restoration Ready**: true
|
|
"""
|
|
|
|
# Create backup if file exists
|
|
output_path = Path(args.output_file)
|
|
if output_path.exists():
|
|
backup_path = output_path.with_suffix(f'.backup.{int(time.time())}')
|
|
output_path.rename(backup_path)
|
|
print(f"📦 Created backup: {backup_path}")
|
|
|
|
with open(args.output_file, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
file_size = Path(args.output_file).stat().st_size
|
|
print(f"✅ State generated: {file_size:,} bytes")
|
|
print(f"💾 Output: {args.output_file}")
|
|
|
|
print("\n🎉 Orchestrator state population completed successfully!")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
raise
|
|
|
|
if __name__ == '__main__':
|
|
main() |