Retry Logic & DLQ Implementation Guide
In distributed systems, handling failures gracefully is crucial for maintaining reliability and preventing data loss. This guide outlines the implementation of retry logic and a Dead Letter Queue (DLQ) to address failed operations within the system. Currently, the system lacks retry mechanisms and a DLQ, leading to permanent data loss when operations fail. This comprehensive guide details how to implement these features to enhance system resilience.
Current System State
Currently, the system's error handling is minimal, with errors being logged but not recovered. There is no retry logic implemented, even though the backoff library is listed as a dependency. Additionally, a Dead Letter Queue (DLQ) is not implemented, meaning failed events and operations are permanently lost. This lack of robust error handling can lead to significant issues, especially in scenarios involving transient failures or external service outages.
Key Deficiencies
- No Retry Logic: The system does not attempt to retry failed operations, leading to immediate data loss.
- Missing Dead Letter Queue: There is no mechanism to store and manage failed events for later analysis or reprocessing.
- Unused Backoff Library: The
backofflibrary, which is intended for implementing retry logic, is currently not utilized. - Minimal Error Handling: Error handling primarily consists of logging, without any recovery mechanisms in place.
Problems Caused by Current State
The absence of retry logic and a DLQ leads to several critical issues that can severely impact the system's reliability and data integrity. Addressing these problems is essential for building a robust and fault-tolerant system.
- Permanent Loss of Events Due to Transient Network Failures: Intermittent network issues can cause events to fail, and without retry mechanisms, these events are permanently lost.
- Failures from Ollama/Gemini API Timeouts: Timeouts when calling external APIs, such as Ollama or Gemini, result in failures that are not retried, leading to incomplete operations.
- Cascading Failures Due to Redis Connection Issues: Problems with Redis connections can propagate throughout the system, causing widespread failures due to the lack of retry logic and circuit breakers.
- Inability to Recover from Temporary Service Outages: When external services experience temporary outages, the system cannot recover automatically, leading to data loss and operational disruptions.
Implementation Requirements
To address these issues, the following implementation requirements are essential. These steps will introduce retry logic, a Dead Letter Queue, circuit breaker patterns, and graceful degradation strategies to enhance the system's resilience.
1. Add Retry Logic with Exponential Backoff
Implementing retry logic with exponential backoff is crucial for handling transient failures. This approach involves retrying failed operations with increasing delays between attempts, providing a balance between immediate retries and avoiding overwhelming the system.
For External API Calls
Wrap all external service calls with retry decorators to automatically handle failures. The backoff library can be used to implement exponential backoff. Here’s an example for the LLMService (src/cortex/services/llmservice.py):
import backoff
import requests
@backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, TimeoutError),
max_tries=3,
max_time=30
)
def generate(self, prompt: str, model: str) -> str:
# existing implementation
pass
Apply this retry logic to the following:
- Ollama API calls (generate, embed): Ensure that API calls to Ollama, such as generating text or creating embeddings, are retried on failure.
- Gemini API calls (generate, TTS): Implement retries for Gemini API calls, including text generation and text-to-speech (TTS) operations.
- Upstash vector DB queries: Wrap queries to the Upstash vector database with retry logic to handle potential connection or query failures.
- ChromaDB operations: Apply retries to operations involving ChromaDB, such as inserting or querying data.
For Redis Operations
Redis operations are critical for many system functions, so adding retry logic is essential for maintaining reliability.
- ARQ job enqueuing: Implement retries for enqueuing jobs using ARQ (Asynchronous Redis Queue) to ensure tasks are processed even if the initial enqueue attempt fails.
- Redis Pub/Sub publishing: Add retry logic for publishing messages using Redis Pub/Sub to ensure messages are delivered despite transient issues.
- Connection pool creation: Implement retries for creating Redis connection pools to handle cases where initial connection attempts fail.
For Event Submission
Ensure that event submissions from various sources are retried to prevent data loss.
- Git hook post-commit event submission: Implement retries for submitting events triggered by Git hooks after commits.
- File watcher event submission: Add retry logic for events submitted by file watchers to ensure file changes are captured and processed.
- IDE plugin event submission (when implemented): When IDE plugins are implemented, ensure that event submissions from these plugins are also retried.
2. Implement Dead Letter Queue (DLQ)
A Dead Letter Queue (DLQ) is a critical component for handling persistent failures. It acts as a storage mechanism for events that have failed processing after multiple retries. This allows for later analysis, manual retry, or other corrective actions.
Architecture
The DLQ implementation follows this architecture:
Event → Process → Success ✓
↓
Failure (retry 3x)
↓
DLQ (Redis list)
Events are initially processed. If an event fails, it is retried up to a predefined number of times (e.g., 3 times). If the event still fails after the retries, it is moved to the DLQ.
Implementation
Create a new file src/cortex/core/dlq.py to implement the DLQ functionality:
import redis
import json
import traceback
import asyncio
from typing import Optional, List, Dict, Any
class DeadLetterQueue:
def __init__(self, redis_pool: redis.asyncio.Redis):
self.redis = redis_pool
self.dlq_key = "dlq:events"
async def push(self, event_data: Dict[str, Any], error: str, retry_count: int):
"""Push failed event to DLQ with metadata"""
timestamp = datetime.utcnow().isoformat()
event = {
"event_data": event_data,
"error": error,
"retry_count": retry_count,
"timestamp": timestamp,
"stack_trace": traceback.format_exc(),
}
await self.redis.rpush(self.dlq_key, json.dumps(event))
async def get_failed_events(self, limit: int = 100) -> List[Dict[str, Any]]:
"""Retrieve failed events for manual retry"""
events = []
dlq_length = await self.redis.llen(self.dlq_key)
start = max(0, dlq_length - limit)
for event_json in await self.redis.lrange(self.dlq_key, start, -1):
events.append(json.loads(event_json))
return events
async def retry_event(self, event_data: Dict[str, Any]):
"""Manually retry a failed event"""
# Re-enqueue the event for processing (implementation depends on your event processing system)
print(f"Retrying event: {event_data}")
# Example: Assuming you have a function process_event(event_data)
# await process_event(event_data)
pass
async def purge_old_events(self, days: int = 7):
"""Remove old failed events"""
cutoff_timestamp = datetime.utcnow() - timedelta(days=days)
cutoff_str = cutoff_timestamp.isoformat()
events = await self.get_failed_events(limit=await self.redis.llen(self.dlq_key))
events_to_keep = [event for event in events if event["timestamp"] > cutoff_str]
# Clear the DLQ and repopulate it with the events to keep
await self.redis.delete(self.dlq_key)
for event in events_to_keep:
await self.redis.rpush(self.dlq_key, json.dumps(event))
# Example usage (in a worker or error handling context):
# from cortex.core.dlq import DeadLetterQueue
# dlq = DeadLetterQueue(redis_pool)
# await dlq.push(event_data, error_message, retry_count)
DLQ Storage
Store failed events in Redis with the following metadata:
- Event data (original payload): The original data of the failed event.
- Error message: A description of the error that caused the failure.
- Retry count: The number of times the event was retried.
- Timestamp: The time when the event failed and was added to the DLQ.
- Stack trace: The stack trace of the error for debugging purposes.
3. Circuit Breaker Pattern
Implement the Circuit Breaker pattern to prevent cascading failures when services are down. This pattern monitors the success and failure rates of service calls and, when the failure rate exceeds a threshold, “opens” the circuit, preventing further calls to the failing service. After a timeout period, the circuit “half-opens,” allowing a trial call to see if the service has recovered. If the call succeeds, the circuit “closes,” and normal operations resume.
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=60)
def call_ollama_api(...):
# API call
pass
Apply circuit breakers to:
- Ollama service: Protect against failures in the Ollama service.
- Gemini service: Implement circuit breaking for the Gemini service.
- Upstash service: Add circuit breakers for calls to the Upstash service.
4. Graceful Degradation
Implement graceful degradation strategies to maintain partial functionality when services fail. This approach ensures that the system can continue to operate, albeit with reduced capabilities, rather than failing completely.
- Skip audio generation if TTS fails (deliver text only): If the text-to-speech (TTS) service fails, skip audio generation and deliver the text-based content instead.
- Skip public knowledge if Upstash fails (use private only): If the Upstash service is unavailable, use only private knowledge sources and skip public knowledge retrieval.
- Queue insights if Redis Pub/Sub fails (deliver on reconnect): If Redis Pub/Sub fails, queue insights and deliver them when the connection is re-established.
Acceptance Criteria
The following acceptance criteria must be met to ensure the successful implementation of retry logic and a DLQ:
- [ ] All external API calls have retry logic.
- [ ] Exponential backoff configured (max 3 retries, 30s max time).
- [ ] DLQ implemented in Redis.
- [ ] Failed events stored with metadata.
- [ ] Manual retry endpoint for DLQ events.
- [ ] Circuit breakers for all external services.
- [ ] Graceful degradation strategies implemented.
- [ ] Metrics for retry counts and failures.
- [ ] Documentation for DLQ management.
Files to Modify
The following files need to be modified to implement the retry logic and DLQ:
src/cortex/services/llmservice.py: Add retry decorators to external API calls.src/cortex/services/chroma_service.py: Add retry logic for ChromaDB operations.src/cortex/services/upstash_service.py: Add retry logic for Upstash service calls.src/cortex/core/dlq.py: Create this new file to implement the Dead Letter Queue.src/cortex/workers.py: Integrate the DLQ into the worker processes.src/cortex/api/events.py: Add a DLQ retry endpoint to manually retry events.pyproject.toml: Ensure thatbackoffandcircuitbreakerare listed as dependencies.
Dependencies to Add
The following dependencies need to be added to the project:
backoff = "^2.2.1" # Already listed, just need to use it
circuitbreaker = "^1.4.0" # Add this
Conclusion
Implementing retry logic and a Dead Letter Queue (DLQ) are crucial steps in building a resilient and fault-tolerant system. By wrapping external API calls with retry decorators, utilizing exponential backoff, and creating a robust DLQ in Redis, the system can gracefully handle failures and prevent data loss. Additionally, implementing circuit breakers and graceful degradation strategies ensures that the system remains operational even when external services experience issues. By following this guide, developers can enhance the reliability and stability of their applications.
For more information on implementing retry patterns and DLQs, visit Enterprise Integration Patterns.