Skip to content

Integration

AutoBotSolutions edited this page Apr 27, 2026 · 1 revision

This guide covers integrating the Cognitive Engine with other systems and applications.

Table of Contents


Integration Overview

The Cognitive Engine can be integrated with various systems through multiple interfaces:

  • REST API: HTTP-based API for web applications
  • WebSocket: Real-time bidirectional communication
  • Python Library: Direct Python imports
  • CLI: Command-line interface for scripts
  • Database: Direct database access
  • Message Queues: Asynchronous processing

Integration Patterns

  • Synchronous: Direct request/response
  • Asynchronous: Background processing
  • Event-Driven: React to cognitive events
  • Batch Processing: Process multiple queries
  • Streaming: Real-time cognitive telemetry

REST API Integration

Basic FastAPI Setup

Create a REST API wrapper:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from core.engine import CognitiveEngine
import uvicorn

app = FastAPI(title="Cognitive Engine API")
engine = CognitiveEngine()

class QueryRequest(BaseModel):
    query: str
    min_iterations: int = 3
    max_iterations: int = 50
    confidence_threshold: float = 0.7

class QueryResponse(BaseModel):
    answer: str
    confidence: float
    iterations: int
    reasoning_trace: str = None

@app.post("/query", response_model=QueryResponse)
async def process_query(request: QueryRequest):
    """Process a query through the Cognitive Engine."""
    try:
        # Configure engine for this request
        from core.config import Config
        config = Config(
            min_iterations=request.min_iterations,
            max_iterations=request.max_iterations,
            confidence_threshold=request.confidence_threshold
        )
        
        # Process query
        result = engine.process(request.query)
        
        return QueryResponse(
            answer=result.final_output,
            confidence=result.confidence,
            iterations=result.iteration_count,
            reasoning_trace=result.reasoning_trace
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy", "timestamp": datetime.now().isoformat()}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Authentication

Add authentication to your API:

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    if token != os.environ.get('API_TOKEN'):
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid authentication token"
        )
    return token

@app.post("/query")
async def process_query(
    request: QueryRequest,
    token: str = Depends(verify_token)
):
    # Protected endpoint
    pass

Rate Limiting

Add rate limiting:

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from fastapi import Request

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/query")
@limiter.limit("10/minute")
async def process_query(request: QueryRequest):
    # Rate limited endpoint
    pass

Docker Deployment

Deploy the API with Docker:

FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]

WebSocket Integration

Real-Time Cognitive Events

Stream cognitive events via WebSocket:

from fastapi import WebSocket, WebSocketDisconnect
from dashboard.stream import dashboard_streamer

@app.websocket("/ws/cognitive")
async def cognitive_websocket(websocket: WebSocket):
    """WebSocket endpoint for real-time cognitive events."""
    await websocket.accept()
    
    try:
        # Subscribe to dashboard stream
        dashboard_streamer.subscribe(websocket)
        
        while True:
            # Keep connection alive
            await websocket.receive_text()
            
    except WebSocketDisconnect:
        dashboard_streamer.unsubscribe(websocket)

Client-Side JavaScript

Connect from JavaScript:

const ws = new WebSocket('ws://localhost:8000/ws/cognitive');

ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    
    switch(data.type) {
        case 'thought_generated':
            displayThought(data.thought);
            break;
        case 'memory_updated':
            updateMemoryDisplay(data.memory);
            break;
        case 'deliberation_complete':
            showResult(data.result);
            break;
    }
};

function displayThought(thought) {
    // Update UI with new thought
}

function updateMemoryDisplay(memory) {
    // Update memory visualization
}

function showResult(result) {
    // Display final result
}

Python Library Integration

Direct Import

Use the engine directly in Python:

from core.engine import CognitiveEngine

# Initialize engine
engine = CognitiveEngine()

# Process query
result = engine.process("What is AI?")

# Access results
print(result.final_output)
print(result.confidence)
print(result.iteration_count)

Custom Configuration

Configure the engine programmatically:

from core.config import Config
from core.engine import CognitiveEngine

config = Config(
    min_iterations=5,
    max_iterations=30,
    confidence_threshold=0.8,
    enable_dashboard=False
)

engine = CognitiveEngine(config)

Batch Processing

Process multiple queries:

def process_batch(queries):
    """Process multiple queries efficiently."""
    results = []
    for query in queries:
        result = engine.process(query)
        results.append({
            'query': query,
            'answer': result.final_output,
            'confidence': result.confidence
        })
    return results

queries = [
    "What is Python?",
    "What is JavaScript?",
    "What is Rust?"
]
results = process_batch(queries)

Async Processing

Use async for concurrent processing:

import asyncio

async def process_async(query):
    """Process query asynchronously."""
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, engine.process, query)
    return result

async def process_batch_async(queries):
    """Process multiple queries concurrently."""
    tasks = [process_async(q) for q in queries]
    results = await asyncio.gather(*tasks)
    return results

Web Application Integration

Flask Integration

Integrate with Flask:

from flask import Flask, request, jsonify
from core.engine import CognitiveEngine

app = Flask(__name__)
engine = CognitiveEngine()

@app.route('/query', methods=['POST'])
def query():
    data = request.json
    result = engine.process(data['query'])
    return jsonify({
        'answer': result.final_output,
        'confidence': result.confidence
    })

if __name__ == '__main__':
    app.run(debug=True)

Django Integration

Integrate with Django:

# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from core.engine import CognitiveEngine

engine = CognitiveEngine()

@csrf_exempt
def query_view(request):
    if request.method == 'POST':
        data = json.loads(request.body)
        result = engine.process(data['query'])
        return JsonResponse({
            'answer': result.final_output,
            'confidence': result.confidence
        })

React Integration

Integrate with React:

// api.js
export async function queryCognitiveEngine(query) {
    const response = await fetch('/api/query', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
        },
        body: JSON.stringify({ query }),
    });
    return response.json();
}

// Component.js
import { useState } from 'react';
import { queryCognitiveEngine } from './api';

function CognitiveQuery() {
    const [query, setQuery] = useState('');
    const [result, setResult] = useState(null);
    
    const handleSubmit = async (e) => {
        e.preventDefault();
        const response = await queryCognitiveEngine(query);
        setResult(response);
    };
    
    return (
        <form onSubmit={handleSubmit}>
            <input 
                value={query} 
                onChange={(e) => setQuery(e.target.value)}
                placeholder="Enter your query"
            />
            <button type="submit">Submit</button>
            {result && (
                <div>
                    <p>{result.answer}</p>
                    <p>Confidence: {result.confidence}</p>
                </div>
            )}
        </form>
    );
}

CLI Integration

Command-Line Tool

Create a CLI tool:

#!/usr/bin/env python3
import argparse
from core.engine import CognitiveEngine

def main():
    parser = argparse.ArgumentParser(description='Cognitive Engine CLI')
    parser.add_argument('query', help='Query to process')
    parser.add_argument('--iterations', type=int, default=10,
                       help='Maximum iterations')
    parser.add_argument('--confidence', type=float, default=0.7,
                       help='Confidence threshold')
    parser.add_argument('--output', help='Output file')
    
    args = parser.parse_args()
    
    # Configure engine
    from core.config import Config
    config = Config(
        max_iterations=args.iterations,
        confidence_threshold=args.confidence
    )
    
    engine = CognitiveEngine(config)
    result = engine.process(args.query)
    
    output = f"Answer: {result.final_output}\n"
    output += f"Confidence: {result.confidence}\n"
    output += f"Iterations: {result.iteration_count}\n"
    
    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
    else:
        print(output)

if __name__ == '__main__':
    main()

Shell Script Integration

Use from shell scripts:

#!/bin/bash
QUERY="$1"
OUTPUT="$2"

python3 cognitive_cli.py "$QUERY" --output "$OUTPUT"

if [ $? -eq 0 ]; then
    echo "Query processed successfully"
    cat "$OUTPUT"
else
    echo "Query processing failed"
    exit 1
fi

Database Integration

Direct Database Access

Access memory database directly:

import sqlite3

def get_memory_stats():
    """Get statistics from memory database."""
    conn = sqlite3.connect('cognitive_engine.db')
    cursor = conn.cursor()
    
    # Get episodic memory count
    cursor.execute("SELECT COUNT(*) FROM episodic_memory")
    episodic_count = cursor.fetchone()[0]
    
    # Get pattern memory count
    cursor.execute("SELECT COUNT(*) FROM pattern_memory")
    pattern_count = cursor.fetchone()[0]
    
    # Get rule memory count
    cursor.execute("SELECT COUNT(*) FROM rule_memory")
    rule_count = cursor.fetchone()[0]
    
    conn.close()
    
    return {
        'episodic': episodic_count,
        'patterns': pattern_count,
        'rules': rule_count
    }

PostgreSQL Integration

Use PostgreSQL for production:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# Configure PostgreSQL
DATABASE_URL = "postgresql://user:pass@localhost/cognitive_engine"
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)

def store_result(result):
    """Store processing result in PostgreSQL."""
    session = Session()
    
    from models import ProcessResult
    db_result = ProcessResult(
        query=result.query,
        answer=result.final_output,
        confidence=result.confidence,
        iterations=result.iteration_count,
        timestamp=datetime.now()
    )
    
    session.add(db_result)
    session.commit()
    session.close()

Message Queue Integration

RabbitMQ Integration

Use RabbitMQ for async processing:

import pika
from core.engine import CognitiveEngine

def process_query(ch, method, properties, body):
    """Process query from message queue."""
    query = body.decode('utf-8')
    
    engine = CognitiveEngine()
    result = engine.process(query)
    
    # Send response
    ch.basic_publish(
        exchange='',
        routing_key=properties.reply_to,
        properties=pika.BasicProperties(correlation_id=properties.correlation_id),
        body=str(result.final_output)
    )
    
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Setup connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='cognitive_queries')
channel.basic_consume(queue='cognitive_queries', on_message_callback=process_query)

print('Waiting for messages...')
channel.start_consuming()

Celery Integration

Use Celery for distributed task processing:

# tasks.py
from celery import Celery
from core.engine import CognitiveEngine

app = Celery('cognitive_tasks', broker='redis://localhost:6379')

@app.task
def process_query_task(query):
    """Celery task for processing queries."""
    engine = CognitiveEngine()
    result = engine.process(query)
    return {
        'answer': result.final_output,
        'confidence': result.confidence,
        'iterations': result.iteration_count
    }

# Usage
from tasks import process_query_task
result = process_query_task.delay("What is AI?")

Cloud Service Integration

AWS Lambda

Deploy as AWS Lambda function:

import json
from core.engine import CognitiveEngine

def lambda_handler(event, context):
    """AWS Lambda handler."""
    query = event.get('query')
    
    engine = CognitiveEngine()
    result = engine.process(query)
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'answer': result.final_output,
            'confidence': result.confidence
        })
    }

Google Cloud Functions

Deploy as Google Cloud Function:

import functions_framework
from core.engine import CognitiveEngine

@functions_framework.http
def process_query(request):
    """HTTP Cloud Function."""
    request_json = request.get_json(silent=True)
    query = request_json.get('query')
    
    engine = CognitiveEngine()
    result = engine.process(query)
    
    return {
        'answer': result.final_output,
        'confidence': result.confidence
    }

Azure Functions

Deploy as Azure Function:

import azure.functions as func
from core.engine import CognitiveEngine

def main(req: func.HttpRequest) -> func.HttpResponse:
    """Azure Function handler."""
    query = req.params.get('query')
    
    engine = CognitiveEngine()
    result = engine.process(query)
    
    return func.HttpResponse(
        json.dumps({
            'answer': result.final_output,
            'confidence': result.confidence
        }),
        status_code=200
    )

Custom Tool Development

Creating Custom Tools

Add custom tools for agent mode:

from tools.registry import Tool, ToolRegistry

class CustomTool(Tool):
    """Custom tool example."""
    
    def __init__(self):
        super().__init__(
            name="custom_tool",
            description="Description of what this tool does"
        )
    
    def execute(self, params):
        """Execute the tool."""
        # Your custom logic here
        result = self.perform_operation(params)
        return {
            'success': True,
            'data': result,
            'execution_time': 0.5
        }
    
    def perform_operation(self, params):
        """Implement your operation."""
        # Your implementation
        return "result"

# Register the tool
tool_registry = ToolRegistry()
tool_registry.register_tool(CustomTool())

Tool with API Integration

Create a tool that calls external APIs:

import requests

class WeatherTool(Tool):
    """Tool for getting weather information."""
    
    def __init__(self, api_key):
        super().__init__(
            name="weather",
            description="Get current weather for a location"
        )
        self.api_key = api_key
    
    def execute(self, params):
        """Get weather for location."""
        location = params.get('location')
        
        url = f"http://api.weatherapi.com/v1/current.json?key={self.api_key}&q={location}"
        response = requests.get(url)
        data = response.json()
        
        return {
            'success': True,
            'data': data,
            'location': location
        }

Database Tool

Create a tool for database operations:

import sqlite3

class DatabaseQueryTool(Tool):
    """Tool for querying databases."""
    
    def __init__(self, db_path):
        super().__init__(
            name="db_query",
            description="Execute SQL queries on a database"
        )
        self.db_path = db_path
    
    def execute(self, params):
        """Execute SQL query."""
        query = params.get('query')
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute(query)
        results = cursor.fetchall()
        
        conn.close()
        
        return {
            'success': True,
            'data': results,
            'row_count': len(results)
        }

Monitoring Integration

Prometheus Integration

Export metrics to Prometheus:

from prometheus_client import Counter, Histogram, start_http_server

# Define metrics
query_counter = Counter('cognitive_queries_total', 'Total queries processed')
query_duration = Histogram('cognitive_query_duration_seconds', 'Query duration')

def process_with_metrics(query):
    """Process query with metrics."""
    query_counter.inc()
    
    with query_duration.time():
        result = engine.process(query)
    
    return result

# Start metrics server
start_http_server(8001)

Grafana Integration

Create Grafana dashboard:

{
  "dashboard": {
    "title": "Cognitive Engine Metrics",
    "panels": [
      {
        "title": "Query Rate",
        "targets": [
          {
            "expr": "rate(cognitive_queries_total[5m])"
          }
        ]
      },
      {
        "title": "Query Duration",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, cognitive_query_duration_seconds)"
          }
        ]
      }
    ]
  }
}

Testing Integration

Unit Tests

Test integration points:

import pytest
from core.engine import CognitiveEngine

def test_api_integration():
    """Test API integration."""
    engine = CognitiveEngine()
    result = engine.process("Test query")
    assert result.final_output is not None
    assert result.confidence >= 0
    assert result.iteration_count > 0

def test_batch_processing():
    """Test batch processing."""
    queries = ["Query 1", "Query 2", "Query 3"]
    results = [engine.process(q) for q in queries]
    assert len(results) == len(queries)

Integration Tests

Test full integration:

def test_websocket_integration():
    """Test WebSocket integration."""
    import websockets
    
    async def test():
        async with websockets.connect('ws://localhost:8000/ws/cognitive') as ws:
            # Send test message
            await ws.send('test')
            # Receive response
            response = await ws.recv()
            assert response is not None
    
    asyncio.run(test())

Best Practices

Error Handling

Implement robust error handling:

from fastapi import HTTPException

@app.post("/query")
async def process_query(request: QueryRequest):
    try:
        result = engine.process(request.query)
        return result
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail="Internal error")

Logging

Log integration events:

import logging

logger = logging.getLogger('integration')

@app.post("/query")
async def process_query(request: QueryRequest):
    logger.info(f"Processing query: {request.query[:50]}...")
    result = engine.process(request.query)
    logger.info(f"Query processed in {result.iteration_count} iterations")
    return result

Validation

Validate inputs:

from pydantic import BaseModel, validator

class QueryRequest(BaseModel):
    query: str
    
    @validator('query')
    def validate_query(cls, v):
        if len(v) > 10000:
            raise ValueError("Query too long")
        if not v.strip():
            raise ValueError("Query cannot be empty")
        return v

Security

Implement security measures:

from fastapi import Depends, HTTPException
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")

async def get_api_key(api_key: str = Depends(api_key_header)):
    if api_key != os.environ.get("API_KEY"):
        raise HTTPException(status_code=403, detail="Invalid API Key")
    return api_key

Support

For integration issues:

  • Email: autobotsolution@gmail.com
  • Address: Flushing MI
  • Check logs: cognitive_engine.log
  • Review API documentation
  • Test integration endpoints

Clone this wiki locally