23 KiB
23 KiB
Universal LLM Interface
Multi-Provider LLM Abstraction for Enhanced BMAD System
The Universal LLM Interface provides seamless integration with multiple LLM providers, enabling the BMAD system to work with Claude, GPT, Gemini, DeepSeek, Llama, and any future LLM while optimizing for cost, capability, and performance.
LLM Abstraction Architecture
Universal LLM Provider Framework
llm_provider_architecture:
core_abstraction:
universal_interface:
- standardized_request_format: "Common interface for all LLM interactions"
- response_normalization: "Unified response structure across providers"
- capability_detection: "Automatic detection of LLM-specific capabilities"
- error_handling: "Graceful degradation and fallback mechanisms"
- cost_tracking: "Real-time cost monitoring and optimization"
provider_adapters:
anthropic_claude:
- api_integration: "Native Claude API integration"
- tool_use_support: "Advanced tool use capabilities"
- function_calling: "Native function calling support"
- streaming_support: "Real-time streaming responses"
- context_windows: "Large context window optimization"
openai_gpt:
- gpt4_integration: "GPT-4 and GPT-4 Turbo support"
- function_calling: "OpenAI function calling format"
- vision_capabilities: "GPT-4 Vision integration"
- code_interpreter: "Code execution capabilities"
- assistant_api: "OpenAI Assistant API integration"
google_gemini:
- gemini_pro_integration: "Gemini Pro and Ultra support"
- multimodal_capabilities: "Text, image, and video processing"
- code_execution: "Native code execution environment"
- safety_filters: "Built-in safety and content filtering"
- vertex_ai_integration: "Enterprise Vertex AI support"
deepseek_coder:
- code_specialization: "Code-focused LLM optimization"
- repository_understanding: "Large codebase comprehension"
- code_generation: "Advanced code generation capabilities"
- technical_reasoning: "Deep technical problem solving"
meta_llama:
- open_source_integration: "Llama 2 and Code Llama support"
- local_deployment: "On-premises deployment support"
- fine_tuning: "Custom model fine-tuning capabilities"
- privacy_preservation: "Complete data privacy control"
custom_providers:
- plugin_architecture: "Support for custom LLM providers"
- api_adaptation: "Automatic API format adaptation"
- capability_mapping: "Custom capability definition"
- performance_monitoring: "Custom provider performance tracking"
LLM Capability Detection and Routing
async def detect_llm_capabilities(provider_name, model_name):
"""
Automatically detect and catalog LLM capabilities for intelligent routing
"""
capability_detection = {
'provider': provider_name,
'model': model_name,
'capabilities': {},
'performance_metrics': {},
'cost_metrics': {},
'limitations': {}
}
# Test core capabilities
core_capabilities = await test_core_llm_capabilities(provider_name, model_name)
# Test specialized capabilities
specialized_capabilities = {
'code_generation': await test_code_generation_capability(provider_name, model_name),
'code_analysis': await test_code_analysis_capability(provider_name, model_name),
'function_calling': await test_function_calling_capability(provider_name, model_name),
'tool_use': await test_tool_use_capability(provider_name, model_name),
'multimodal': await test_multimodal_capability(provider_name, model_name),
'reasoning': await test_reasoning_capability(provider_name, model_name),
'context_handling': await test_context_handling_capability(provider_name, model_name),
'streaming': await test_streaming_capability(provider_name, model_name)
}
# Performance benchmarking
performance_metrics = await benchmark_llm_performance(provider_name, model_name)
# Cost analysis
cost_metrics = await analyze_llm_costs(provider_name, model_name)
capability_detection.update({
'capabilities': {**core_capabilities, **specialized_capabilities},
'performance_metrics': performance_metrics,
'cost_metrics': cost_metrics,
'detection_timestamp': datetime.utcnow().isoformat(),
'confidence_score': calculate_capability_confidence(core_capabilities, specialized_capabilities)
})
return capability_detection
async def intelligent_llm_routing(task_requirements, available_providers):
"""
Intelligently route tasks to optimal LLM based on capabilities, cost, and performance
"""
routing_analysis = {
'task_requirements': task_requirements,
'candidate_providers': [],
'routing_decision': {},
'fallback_options': [],
'cost_optimization': {}
}
# Analyze task requirements
task_analysis = await analyze_task_requirements(task_requirements)
# Score each available provider
for provider in available_providers:
provider_score = await score_provider_for_task(provider, task_analysis)
routing_candidate = {
'provider': provider,
'capability_match': provider_score['capability_match'],
'performance_score': provider_score['performance_score'],
'cost_efficiency': provider_score['cost_efficiency'],
'reliability_score': provider_score['reliability_score'],
'overall_score': calculate_overall_provider_score(provider_score)
}
routing_analysis['candidate_providers'].append(routing_candidate)
# Select optimal provider
optimal_provider = select_optimal_provider(routing_analysis['candidate_providers'])
# Define fallback strategy
fallback_providers = define_fallback_strategy(
routing_analysis['candidate_providers'],
optimal_provider
)
routing_analysis.update({
'routing_decision': optimal_provider,
'fallback_options': fallback_providers,
'cost_optimization': calculate_cost_optimization(optimal_provider, task_analysis)
})
return routing_analysis
class UniversalLLMInterface:
"""
Universal interface for interacting with multiple LLM providers
"""
def __init__(self):
self.providers = {}
self.capability_cache = {}
self.cost_tracker = CostTracker()
self.performance_monitor = PerformanceMonitor()
async def register_provider(self, provider_name, provider_config):
"""Register a new LLM provider"""
provider_adapter = await create_provider_adapter(provider_name, provider_config)
# Test provider connectivity
connectivity_test = await test_provider_connectivity(provider_adapter)
if connectivity_test.success:
self.providers[provider_name] = provider_adapter
# Detect and cache capabilities
capabilities = await detect_llm_capabilities(
provider_name,
provider_config.get('model', 'default')
)
self.capability_cache[provider_name] = capabilities
return {
'registration_status': 'success',
'provider': provider_name,
'capabilities': capabilities,
'ready_for_use': True
}
else:
return {
'registration_status': 'failed',
'provider': provider_name,
'error': connectivity_test.error,
'ready_for_use': False
}
async def execute_task(self, task_definition, routing_preferences=None):
"""
Execute a task using the optimal LLM provider
"""
# Determine optimal provider
routing_decision = await intelligent_llm_routing(
task_definition,
list(self.providers.keys())
)
optimal_provider = routing_decision['routing_decision']['provider']
# Execute task with monitoring
execution_start = datetime.utcnow()
try:
# Execute with primary provider
result = await self.providers[optimal_provider].execute_task(task_definition)
execution_duration = (datetime.utcnow() - execution_start).total_seconds()
# Track performance and costs
await self.performance_monitor.record_execution(
optimal_provider,
task_definition,
result,
execution_duration
)
await self.cost_tracker.record_usage(
optimal_provider,
task_definition,
result
)
return {
'result': result,
'provider_used': optimal_provider,
'execution_time': execution_duration,
'routing_analysis': routing_decision,
'status': 'success'
}
except Exception as e:
# Try fallback providers
for fallback_provider in routing_decision['fallback_options']:
try:
fallback_result = await self.providers[fallback_provider['provider']].execute_task(
task_definition
)
execution_duration = (datetime.utcnow() - execution_start).total_seconds()
return {
'result': fallback_result,
'provider_used': fallback_provider['provider'],
'execution_time': execution_duration,
'primary_provider_failed': optimal_provider,
'fallback_used': True,
'status': 'success_with_fallback'
}
except Exception as fallback_error:
continue
# All providers failed
return {
'status': 'failed',
'primary_provider': optimal_provider,
'primary_error': str(e),
'fallback_attempts': len(routing_decision['fallback_options']),
'execution_time': (datetime.utcnow() - execution_start).total_seconds()
}
Provider-Specific Adapters
Claude Adapter Implementation
class ClaudeAdapter:
"""
Adapter for Anthropic Claude API integration
"""
def __init__(self, config):
self.config = config
self.client = anthropic.Anthropic(api_key=config['api_key'])
self.model = config.get('model', 'claude-3-sonnet-20240229')
async def execute_task(self, task_definition):
"""Execute task using Claude API"""
# Convert universal task format to Claude format
claude_request = await self.convert_to_claude_format(task_definition)
# Handle different task types
if task_definition['type'] == 'code_analysis':
return await self.execute_code_analysis(claude_request)
elif task_definition['type'] == 'code_generation':
return await self.execute_code_generation(claude_request)
elif task_definition['type'] == 'reasoning':
return await self.execute_reasoning_task(claude_request)
elif task_definition['type'] == 'tool_use':
return await self.execute_tool_use_task(claude_request)
else:
return await self.execute_general_task(claude_request)
async def execute_tool_use_task(self, claude_request):
"""Execute task with Claude tool use capabilities"""
# Define available tools for Claude
tools = [
{
"name": "code_analyzer",
"description": "Analyze code structure and patterns",
"input_schema": {
"type": "object",
"properties": {
"code": {"type": "string"},
"language": {"type": "string"},
"analysis_type": {"type": "string"}
}
}
},
{
"name": "file_navigator",
"description": "Navigate and understand file structures",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"operation": {"type": "string"}
}
}
}
]
response = await self.client.messages.create(
model=self.model,
max_tokens=4000,
tools=tools,
messages=claude_request['messages']
)
# Handle tool use responses
if response.stop_reason == "tool_use":
tool_results = []
for tool_use in response.content:
if tool_use.type == "tool_use":
tool_result = await self.execute_tool(tool_use.name, tool_use.input)
tool_results.append(tool_result)
# Continue conversation with tool results
follow_up_response = await self.client.messages.create(
model=self.model,
max_tokens=4000,
messages=[
*claude_request['messages'],
{"role": "assistant", "content": response.content},
{"role": "user", "content": [{"type": "tool_result", "tool_use_id": tool_use.id, "content": str(result)} for tool_use, result in zip(response.content, tool_results)]}
]
)
return {
'response': follow_up_response.content[0].text,
'tool_uses': tool_results,
'tokens_used': response.usage.input_tokens + response.usage.output_tokens + follow_up_response.usage.input_tokens + follow_up_response.usage.output_tokens
}
return {
'response': response.content[0].text,
'tokens_used': response.usage.input_tokens + response.usage.output_tokens
}
class GPTAdapter:
"""
Adapter for OpenAI GPT API integration
"""
def __init__(self, config):
self.config = config
self.client = openai.OpenAI(api_key=config['api_key'])
self.model = config.get('model', 'gpt-4-turbo-preview')
async def execute_task(self, task_definition):
"""Execute task using OpenAI GPT API"""
# Convert universal task format to OpenAI format
openai_request = await self.convert_to_openai_format(task_definition)
# Handle function calling for tool use
if task_definition['type'] == 'tool_use':
return await self.execute_function_calling_task(openai_request)
else:
return await self.execute_chat_completion(openai_request)
async def execute_function_calling_task(self, openai_request):
"""Execute task with OpenAI function calling"""
functions = [
{
"name": "analyze_code",
"description": "Analyze code structure and identify patterns",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "The code to analyze"},
"language": {"type": "string", "description": "Programming language"},
"focus": {"type": "string", "description": "Analysis focus area"}
},
"required": ["code", "language"]
}
}
]
response = await self.client.chat.completions.create(
model=self.model,
messages=openai_request['messages'],
functions=functions,
function_call="auto"
)
# Handle function calls
if response.choices[0].message.function_call:
function_name = response.choices[0].message.function_call.name
function_args = json.loads(response.choices[0].message.function_call.arguments)
function_result = await self.execute_function(function_name, function_args)
# Continue conversation with function result
follow_up_response = await self.client.chat.completions.create(
model=self.model,
messages=[
*openai_request['messages'],
{
"role": "assistant",
"content": None,
"function_call": response.choices[0].message.function_call
},
{
"role": "function",
"name": function_name,
"content": str(function_result)
}
]
)
return {
'response': follow_up_response.choices[0].message.content,
'function_calls': [{
'name': function_name,
'arguments': function_args,
'result': function_result
}],
'tokens_used': response.usage.total_tokens + follow_up_response.usage.total_tokens
}
return {
'response': response.choices[0].message.content,
'tokens_used': response.usage.total_tokens
}
Cost Optimization Engine
Intelligent Cost Management
class CostOptimizationEngine:
"""
Intelligent cost optimization for multi-LLM usage
"""
def __init__(self):
self.cost_models = {}
self.usage_history = []
self.budget_limits = {}
self.cost_alerts = []
async def optimize_llm_selection(self, task_requirements, available_providers):
"""
Select LLM based on cost efficiency while maintaining quality
"""
optimization_analysis = {
'task_requirements': task_requirements,
'cost_analysis': {},
'quality_predictions': {},
'optimization_recommendation': {}
}
# Estimate costs for each provider
for provider in available_providers:
cost_estimate = await self.estimate_task_cost(task_requirements, provider)
quality_prediction = await self.predict_task_quality(task_requirements, provider)
optimization_analysis['cost_analysis'][provider] = cost_estimate
optimization_analysis['quality_predictions'][provider] = quality_prediction
# Calculate cost-quality efficiency
efficiency_scores = {}
for provider in available_providers:
cost = optimization_analysis['cost_analysis'][provider]['estimated_cost']
quality = optimization_analysis['quality_predictions'][provider]['quality_score']
# Higher quality per dollar is better
efficiency_scores[provider] = quality / cost if cost > 0 else 0
# Select most efficient provider
optimal_provider = max(efficiency_scores.items(), key=lambda x: x[1])
optimization_analysis['optimization_recommendation'] = {
'recommended_provider': optimal_provider[0],
'efficiency_score': optimal_provider[1],
'cost_savings': calculate_cost_savings(optimization_analysis),
'quality_impact': assess_quality_impact(optimization_analysis, optimal_provider[0])
}
return optimization_analysis
async def monitor_budget_usage(self):
"""
Monitor and alert on budget usage across all LLM providers
"""
budget_status = {}
for provider, budget_limit in self.budget_limits.items():
current_usage = await self.calculate_current_usage(provider)
budget_status[provider] = {
'budget_limit': budget_limit,
'current_usage': current_usage,
'remaining_budget': budget_limit - current_usage,
'usage_percentage': (current_usage / budget_limit) * 100,
'projected_monthly_usage': await self.project_monthly_usage(provider)
}
# Generate alerts for high usage
if budget_status[provider]['usage_percentage'] > 80:
alert = {
'provider': provider,
'alert_type': 'budget_warning',
'usage_percentage': budget_status[provider]['usage_percentage'],
'projected_overage': budget_status[provider]['projected_monthly_usage'] - budget_limit,
'recommended_actions': await self.generate_cost_reduction_recommendations(provider)
}
self.cost_alerts.append(alert)
return {
'budget_status': budget_status,
'alerts': self.cost_alerts,
'optimization_recommendations': await self.generate_optimization_recommendations(budget_status)
}
LLM Integration Commands
# LLM provider management
bmad llm register --provider "anthropic" --model "claude-3-sonnet" --api-key "sk-..."
bmad llm register --provider "openai" --model "gpt-4-turbo" --api-key "sk-..."
bmad llm register --provider "google" --model "gemini-pro" --credentials "path/to/creds.json"
# LLM capability testing and optimization
bmad llm test-capabilities --provider "all" --benchmark-performance
bmad llm optimize --cost-efficiency --quality-threshold "0.8"
bmad llm route --task "code-generation" --show-reasoning
# Cost management and monitoring
bmad llm costs --analyze --time-period "last-month"
bmad llm budget --set-limit "anthropic:1000" "openai:500"
bmad llm optimize-costs --aggressive --maintain-quality
# LLM performance monitoring
bmad llm monitor --real-time --performance-alerts
bmad llm benchmark --compare-providers --task-types "code,reasoning,analysis"
bmad llm health --check-all-providers --connectivity-test
This Universal LLM Interface creates a truly provider-agnostic system that can intelligently route tasks to the optimal LLM while optimizing for cost, performance, and quality. The system learns from usage patterns to continuously improve routing decisions and cost efficiency.