Ragas: Controlling Concurrency With @experiment
Evaluating Retrieval-Augmented Generation (RAG) systems involves assessing their performance across various metrics. When using Ragas, the @experiment decorator offers a powerful way to streamline this evaluation process. However, when dealing with large datasets or resource-intensive evaluations, managing concurrency and rate limits becomes crucial. This article addresses the common challenge of encountering "429 Too Many Requests" errors when using the @experiment decorator with Azure OpenAI and provides practical solutions for controlling concurrency and rate limits.
Understanding the Concurrency Challenge in Ragas Evaluations
When conducting evaluations using Ragas, the evaluate_rag.arun function processes datasets in parallel to expedite the evaluation process. While this parallelism significantly reduces the overall evaluation time, it can lead to exceeding the Transactions Per Minute (TPM) or Requests Per Minute (RPM) limits imposed by services like Azure OpenAI. Hitting these limits results in HTTP/1.1 429 Too Many Requests errors, which interrupt the evaluation process. To effectively use Ragas for comprehensive evaluations, understanding and controlling this concurrency is essential.
In the context of Ragas, the @experiment decorator simplifies the evaluation workflow by allowing you to define evaluation functions that automatically process your dataset. However, the default behavior of experiment.arun() is to process the dataset with high parallelism, which, without proper management, can overwhelm the rate limits of the underlying language model APIs. This is especially pertinent when using services like Azure OpenAI, which have specific TPM and RPM limits to ensure fair usage and prevent service overload. Therefore, the key to successful Ragas evaluations lies in finding a balance between parallel processing for speed and respecting the rate limits of the APIs being used.
The challenge is further compounded by the fact that the older evaluate() function in Ragas offered a RunConfig object to manage parameters like max_workers and timeout. However, this configuration is not directly applicable to the new @experiment workflow, leaving users searching for alternative methods to control concurrency. This article bridges that gap by providing a comprehensive guide on how to effectively manage concurrency and rate limits when using @experiment, ensuring smooth and efficient Ragas evaluations. We will explore various strategies, from implementing asynchronous batching to leveraging tools like asyncio.Semaphore, to empower you to tailor your evaluation process to your specific resource constraints and API limitations.
Key Strategies for Concurrency and Rate Limit Control
To effectively manage concurrency and avoid rate limiting issues when using Ragas's @experiment with services like Azure OpenAI, several strategies can be employed. These strategies focus on controlling the number of concurrent requests and ensuring that the evaluation process stays within the API's rate limits. Here are some key techniques:
1. Implement Asynchronous Batching
One effective method is to process the dataset in asynchronous batches. Instead of sending individual requests for each data point, you can group them into batches and process each batch concurrently. Asynchronous batching allows you to control the number of parallel requests, preventing the system from exceeding rate limits.
To implement asynchronous batching, you can use Python's asyncio library. The idea is to divide your dataset into smaller chunks (batches) and then process each batch concurrently using asyncio.gather. This approach ensures that only a limited number of requests are sent to the API at any given time. By carefully choosing the batch size, you can align the request rate with the API's rate limits, thus avoiding 429 errors.
For example, consider a dataset of 1000 samples. Instead of processing each sample individually, you could divide it into 10 batches of 100 samples each. Then, you would use asyncio.gather to concurrently process each batch. This limits the maximum number of concurrent requests to the API to the number of workers you configure, preventing you from overwhelming the rate limits. This method not only mitigates the risk of rate limiting but also maintains efficiency by leveraging parallelism within the defined constraints. Furthermore, asynchronous batching can be easily integrated into your existing evaluation workflow with minimal changes to your code structure, making it a practical and scalable solution for managing concurrency in Ragas evaluations.
2. Utilize asyncio.Semaphore for Concurrency Throttling
Another powerful technique is to use asyncio.Semaphore to limit the number of concurrent tasks. A semaphore is a synchronization primitive that controls access to a shared resource. In this context, the shared resource is the API endpoint, and the semaphore limits the number of concurrent requests.
By initializing a semaphore with a specific value (e.g., asyncio.Semaphore(10)), you can control the maximum number of coroutines that can access a critical section of code concurrently. In the context of Ragas evaluations, this means you can limit the number of concurrent requests to the language model API. Before making a request, a coroutine must acquire the semaphore, and after the request is complete, it releases the semaphore. This ensures that the number of concurrent requests never exceeds the semaphore's limit.
Using asyncio.Semaphore offers a fine-grained control over concurrency, allowing you to precisely match the request rate to the API's limits. This is particularly useful when dealing with APIs that have strict rate limits or when you need to balance evaluation speed with resource constraints. The semaphore can be easily integrated into your asynchronous evaluation functions, providing a robust mechanism for throttling concurrency. Moreover, asyncio.Semaphore is a standard Python library, making it a portable and reliable solution for managing concurrency in Ragas evaluations. Its flexibility and ease of use make it an ideal choice for developers seeking to optimize their evaluation workflows while respecting API rate limits.
3. Implement a Retry Mechanism with Exponential Backoff
Even with concurrency controls in place, occasional rate limiting errors can still occur. Implementing a retry mechanism with exponential backoff can handle these transient errors gracefully. When a 429 error is encountered, the system waits for an increasing amount of time before retrying the request. This approach helps to avoid overwhelming the API with retries and gives it time to recover.
The exponential backoff strategy involves waiting for a short period after the first failure, and then doubling the wait time for each subsequent failure, up to a maximum limit. This allows the system to gradually reduce its request rate, giving the API a chance to recover from overload. For instance, you might start with a 1-second delay, then 2 seconds, 4 seconds, and so on. This pattern ensures that the system doesn't keep bombarding the API with requests during periods of high load.
Implementing a retry mechanism with exponential backoff is a crucial aspect of building robust and resilient evaluation pipelines. It not only handles rate limiting errors but also other transient issues, such as network connectivity problems. By automatically retrying failed requests with increasing delays, the system can recover from temporary disruptions without manual intervention. This approach is especially valuable in automated evaluation workflows where continuous operation is essential. Libraries like tenacity in Python provide decorators and utilities that simplify the implementation of retry logic, making it easier to integrate exponential backoff into your Ragas evaluation functions. This ensures a smoother and more reliable evaluation process, especially when dealing with external APIs.
4. Monitor API Usage and Adjust Concurrency Dynamically
To maintain optimal performance and avoid rate limits, continuously monitoring API usage and dynamically adjusting concurrency is an advanced strategy. By tracking the number of requests made and the rate limit status, you can adapt the evaluation process in real-time to stay within the limits.
This approach involves implementing a monitoring system that tracks the number of API requests made over a specific period. You can then use this information to adjust the concurrency level dynamically. For example, if the request rate approaches the API's limit, you can reduce the number of concurrent tasks. Conversely, if the request rate is well below the limit, you can increase concurrency to speed up the evaluation process. This dynamic adjustment ensures that you are utilizing the API efficiently without exceeding the rate limits.
Implementing dynamic concurrency control requires a more sophisticated setup, but it can yield significant benefits in terms of both efficiency and reliability. It allows your evaluation pipeline to adapt to varying API load and usage patterns, ensuring smooth operation even under fluctuating conditions. This strategy is particularly valuable for long-running evaluations or in scenarios where the API's rate limits may change over time. By continuously monitoring API usage and adjusting concurrency dynamically, you can optimize your Ragas evaluations for maximum throughput while minimizing the risk of rate limiting errors. This proactive approach ensures that your evaluations are not only faster but also more robust and resilient.
Code Example: Implementing Concurrency Control with asyncio.Semaphore
Below is an example demonstrating how to use asyncio.Semaphore to control concurrency when evaluating with Ragas:
import asyncio
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
import pandas as pd
from dotenv import load_dotenv
from openai import AsyncOpenAI
from ragas import Dataset, experiment
from ragas.embeddings.base import embedding_factory
from ragas.llms import llm_factory
from ragas.metrics.collections import (
AnswerCorrectness,
AnswerRelevancy,
ContextPrecision,
ContextRecall,
Faithfulness,
)
from app.configs.config import (
AZURE_OPENAI_API_KEY,
AZURE_OPENAI_ENDPOINT,
EMBEDDING_MODEL,
)
load_dotenv()
# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
logger = logging.getLogger(__name__)
def create_ragas_dataset(dataset_path: Path) -> Dataset:
"""Create a Ragas Dataset from the downloaded JSON file."""
dataset = Dataset(name="wke_faq_testset", backend="local/jsonl", root_dir=".")
df = pd.read_json(dataset_path)
for _, row in df.iterrows():
dataset.append({
"question": row["question"],
"contexts": row["contexts"],
"answer": row["answer"],
"ground_truth": row["ground_truth"],
})
logger.info(f"Created Ragas dataset with {len(df)} samples")
return dataset
@experiment()
async def evaluate_rag(row: Dict[str, Any], llm, embeddings, semaphore: asyncio.Semaphore) -> Dict[str, Any]:
async with semaphore:
question = row["question"]
answer = row["answer"]
contexts = row["contexts"]
ground_truth = row["ground_truth"]
answer_relevancy_scorer = AnswerRelevancy(llm=llm, embeddings=embeddings)
answer_correctness_scorer = AnswerCorrectness(llm=llm, embeddings=embeddings)
contexts_precision_scorer = ContextPrecision(llm=llm)
context_recall_scorer = ContextRecall(llm=llm)
faithfulness_scorer = Faithfulness(llm=llm)
faithfulness = await faithfulness_scorer.ascore(
user_input=question, response=answer, retrieved_contexts=contexts
)
contexts_precision = await contexts_precision_scorer.ascore(
user_input=question, reference=answer, retrieved_contexts=contexts
)
context_recall = await context_recall_scorer.ascore(
user_input=question, reference=answer, retrieved_contexts=contexts
)
answer_relevancy = await answer_relevancy_scorer.ascore(
user_input=question, response=answer
)
answer_correctness = await answer_correctness_scorer.ascore(
user_input=question, response=answer, reference=ground_truth
)
# Return evaluation results
result = {
**row,
"faithfulness": faithfulness.value,
"context_precision": contexts_precision.value,
"context_recall": context_recall.value,
"answer_relevancy": answer_relevancy.value,
"answer_correctness": answer_correctness.value,
}
return result
async def run_experiment():
# Prepare dataset and initialize system
logger.info("Initializing RAG system...")
dataset = create_ragas_dataset(
"./data/eval_dataset/ragas_eval_gpt_5_mini_wke_opt.json"
)
endpoint = f"{AZURE_OPENAI_ENDPOINT.rstrip('/')}/openai/v1"
client = AsyncOpenAI(base_url=endpoint, api_key=AZURE_OPENAI_API_KEY)
# Initialize semaphore with a concurrency limit (e.g., 20)
semaphore = asyncio.Semaphore(20)
experiment_results = await evaluate_rag.arun(
dataset,
name=f"ragas_experiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
llm=llm_factory(
"gpt-5-mini",
client=client,
temperature=1,
top_p=1,
reasoning_effort="low",
),
embeddings=embedding_factory(provider="huggingface", model=EMBEDDING_MODEL),
semaphore=semaphore # Pass the semaphore to the evaluation function
)
return experiment_results
if __name__ == "__main__":
asyncio.run(run_experiment())
In this example, a semaphore is initialized with a concurrency limit of 20. The evaluate_rag function now accepts a semaphore argument and acquires it using an async with block before making any API calls. This ensures that only 20 coroutines can execute the evaluation logic concurrently, effectively controlling the rate of API requests.
Conclusion
Controlling concurrency and rate limits is crucial for efficient and reliable Ragas evaluations, especially when using services like Azure OpenAI. By implementing strategies such as asynchronous batching, utilizing asyncio.Semaphore, implementing retry mechanisms, and dynamically monitoring API usage, you can ensure your evaluations run smoothly without encountering rate limiting errors. These techniques allow you to balance evaluation speed with API limits, optimizing your Ragas workflows for performance and stability.
For further reading and a deeper understanding of asynchronous programming in Python, refer to the official Python asyncio documentation.