Streaming Reads In Zephyr's `load_parquet`: Benefits?

by Alex Johnson 54 views

In this article, we'll discuss an important aspect of Zephyr's load_parquet function and the benefits of implementing streaming reads. Currently, load_parquet and load_files in Zephyr eagerly load data into memory, which can be inefficient for large datasets. We'll explore why streaming reads can significantly improve memory footprint and runtime performance.

Understanding the Issue with Current Implementation

The current implementation of load_parquet in Zephyr loads the entire Parquet file into memory before processing it. This approach has a high memory footprint because it requires storing the entire dataset in memory at once. The code snippet below illustrates how load_parquet currently operates:

def load_parquet(file_path: str, **kwargs) -> Iterator[dict]:
    import pandas as pd
    with open_file(file_path, "rb") as f:
        df = pd.read_parquet(f, **kwargs)
        for _, row in df.iterrows():
            yield row.to_dict()

As you can see, the function uses pandas.read_parquet to read the entire file into a DataFrame (df) and then iterates over the rows. For large Parquet files, this can lead to excessive memory usage and slower processing times. This is particularly problematic when dealing with big data scenarios where files can be several gigabytes in size. The eager loading approach means that processing cannot begin until the entire file is read into memory, leading to delays and potential memory errors.

To illustrate the impact, consider a scenario where you have a 10 GB Parquet file. With the current implementation, your system needs to have enough RAM to load the entire 10 GB file, plus the overhead of the pandas DataFrame and any additional memory used by subsequent processing steps. This can quickly exhaust system resources, especially in environments with limited memory. Moreover, the runtime is affected because the system spends time loading the entire dataset before any actual processing can occur. This delay is significant and can be a bottleneck in data processing pipelines.

The Benefits of Streaming Reads

Streaming reads, on the other hand, process data in smaller chunks or batches, which can significantly reduce memory usage and improve runtime. Instead of loading the entire file into memory, streaming reads process the data in a piecemeal fashion, allowing processing to begin almost immediately. The key advantage here is that the memory footprint is proportional to the size of the batch being processed, rather than the size of the entire dataset. This makes it feasible to process very large files even on systems with limited memory.

By implementing streaming reads in load_parquet, Zephyr can achieve a more efficient and scalable data processing pipeline. The reduced memory footprint means that more data can be processed concurrently, and the faster runtime allows for quicker turnaround times. This is particularly beneficial in scenarios where data needs to be processed in real-time or near real-time, such as in data analytics and machine-learning applications.

To visualize the benefits, consider the memory usage comparison provided earlier. The graph clearly shows a dramatic reduction in peak memory required when using streaming reads compared to the pandas preload method. This difference is substantial, with streaming reads using roughly 80% less memory in the tested scenario. The memory footprint can be further controlled by adjusting the batch size, providing additional flexibility and optimization possibilities.

Benchmarking Memory Footprint and Runtime

To demonstrate the advantages of streaming reads, memory footprint and runtime benchmarks were conducted using the memray tool. The results clearly show that streaming reads significantly reduce memory usage. In the provided example, a dummy pipeline was used to read a Parquet file, hash the text data, and write it back to a Parquet file. The memory usage was compared between the current pandas preload method and the proposed pyarrow streaming read method.

The results speak for themselves. The pandas preload method showed a much higher peak memory usage compared to the pyarrow streaming read. Specifically, streaming reads reduced the peak memory required by approximately 80%. This reduction is crucial for processing large datasets, as it allows Zephyr to handle files that would otherwise exceed memory limits.

Furthermore, runtime benchmarks revealed that streaming reads are also faster. In a quick test, streaming reads using pyarrow were approximately 60% faster than the pandas preload method. This improvement is primarily because streaming reads allow processing to begin before the entire dataset is loaded into memory. For example, if your logic only requires a subset of the data, streaming reads can significantly reduce the overall processing time.

Practical Implementation with PyArrow

Implementing streaming reads can be achieved using libraries like PyArrow, which provides efficient tools for working with Parquet files. PyArrow's streaming read capabilities allow Zephyr to process data in batches, reducing memory consumption and improving performance. The provided patch demonstrates how to modify the load_parquet function to use PyArrow for streaming reads.

The key change in the patch is the replacement of pandas.read_parquet with PyArrow's pq.ParquetFile and its iter_batches method. This approach reads the Parquet file in batches, processing each batch before moving on to the next. The batch_size parameter can be used to control the size of each batch, allowing for fine-tuning of memory usage and performance.

Here’s the relevant code snippet from the patch:

import pyarrow.parquet as pq

with open_file(file_path, "rb") as f:
    parquet_file = pq.ParquetFile(f)
    for batch in parquet_file.iter_batches(**kwargs):
        for record in batch.to_pylist():
            yield record

This code opens the Parquet file using PyArrow and iterates over the batches using iter_batches. Each batch is then converted to a list of records, which are yielded one by one. This streaming approach ensures that only a small portion of the data is in memory at any given time, significantly reducing the memory footprint.

Code Example and Usage

To illustrate how streaming reads can be used in practice, consider the following code snippet:

result = (
    Dataset
    .from_files(...)
    .flat_map(load_parquet)
    .map(lambda r: str(r["text"]))
    .map(lambda t: {"hash": xxhash.xxh64_hexdigest(t.encode("utf-8"))})
    .write_parquet(...)
)
backend.execute(result)

In this example, load_parquet is used to read data from Parquet files. The .flat_map operation applies the load_parquet function to each file, yielding records as dictionaries. The subsequent .map operations transform the data, and the .write_parquet operation writes the processed data back to Parquet files.

By using streaming reads, this pipeline can efficiently process large datasets without running into memory issues. The batch_size parameter in load_parquet can be adjusted to optimize performance for different datasets and hardware configurations. For instance, you can specify a batch_size like this:

.flat_map(lambda f: load_parquet(f, batch_size=1024))

However, it's important to note that picking the optimal batch_size can be tricky and should not be done prematurely. The default batch size of 64K is a good starting point, but experimentation may be necessary to find the best value for your specific use case.

Conclusion

Implementing streaming reads in Zephyr's load_parquet function offers significant advantages in terms of memory footprint and runtime performance. By processing data in batches, streaming reads reduce memory usage, allowing Zephyr to handle larger datasets more efficiently. The faster runtime means quicker processing times, which is crucial for many data processing applications.

Libraries like PyArrow provide the necessary tools to implement streaming reads effectively. By adopting this approach, Zephyr can improve its scalability and performance, making it a more powerful platform for data processing.

In summary, switching to streaming reads for load_parquet in Zephyr is a crucial step toward optimizing performance and scalability. By leveraging PyArrow's capabilities, Zephyr can handle large datasets more efficiently, reduce memory usage, and improve runtime, ultimately providing a better experience for users and developers alike.

For further reading on Parquet and data streaming, consider exploring the Apache Parquet documentation.