Interfacing Ray with internal libraries and services using Pydantic
Companies often develop internal libraries that expose inputs and outputs to functions via Pydantic. Whether your internal library is for machine learning, API clients, or data validation services, you’ll often need to convert large DataFrames into Pydantic objects and pass batches of them to libraries.
Here’s how to do it efficiently using Ray Data.
The Challenge
Many in-house Python libraries/services follow these patterns:
ml_service.predict_batch(records: List[PredictionRequest]) -> List[PredictionResponse]
validator.validate_batch(data: List[DataModel]) -> List[DataValidation]
api_client.submit_batch(objects: List[APIRecord]) -> List[APIResponses]
But your data starts as DataFrames. This approach doesn’t scale if there’s any latency in the sequential processing of a batch:
# This breaks with large datasets
objects = [DataRecord(**row.to_dict()) for _, row in df.iterrows()]
library.process_batch(objects)
The Solution: Ray Data + Actors
Ray Data’s map_batches
with actors can make this easy. Here’s the complete pattern:
import ray
import ray.data
import pandas as pd
from ray.data import Dataset
from pydantic import BaseModel
from typing import List
class DataRecord(BaseModel):
id: str
value: float
category: str
class InternalService:
"""Represents your in-house service that expects Pydantic objects."""
def process_batch(self, records: List[DataRecord]) -> None:
print(f"Processing {len(records)} validated records")
# Your internal logic here
@ray.remote
class DataProcessor:
def __init__(self, service: InternalService):
self.service = service
def process_batch(self, batch_df: pd.DataFrame) -> pd.DataFrame:
# Convert to Pydantic objects
objects = []
for _, row in batch_df.iterrows():
try:
obj = DataRecord(**row.to_dict())
objects.append(obj)
except Exception:
continue # Skip invalid rows
# Submit to your service
if objects:
self.service.process_batch(objects)
return pd.DataFrame({'processed': [len(objects)]})
def process_dataset(dataset: Dataset, service: InternalService, batch_size: int = 1000):
"""Submit to library in parallel."""
result_dataset = dataset.map_batches(
DataProcessor,
batch_size=batch_size,
batch_format="pandas",
fn_constructor_args=[service]
)
# Execute pipeline
total = sum(batch['processed'].sum()
for batch in result_dataset.iter_batches(batch_format="pandas"))
print(f"Processed {total} records")
Why This Works for Internal Libraries and Services
Type Safety: Pydantic validation ensures your internal libraries receive exactly the data they expect, catching issues early in the pipeline rather than deep in business logic.
Interoperability: Many modern Python libraries use Pydantic for their APIs. This pattern lets you seamlessly integrate with them without manual serialization.
Stateful Processing: Ray actors maintain library connections and state across batches - perfect for authenticated clients, ML models, or database connections.
Parallel Execution: Ray automatically distributes work across cores, turning single-threaded library calls into parallel processing.
Real-World Example
import ray
import ray.data
import pandas as pd
ray.init()
import numpy as np
# Limit for demo purposes
limit = 10
# Transform DataFrame into a Ray dataset
dataset = ray.data.from_pandas(pd.DataFrame({
'id': [f'record_{i:06d}' for i in range(500_000)],
'value': np.random.normal(100, 15, 500_000),
'category': np.random.choice(['A', 'B', 'C', 'D'], 500_000),
})).limit(limit)
# Wire up InternalService to be used by the actors
# This service may need to be wired inside the actor using configuration
# Passed into the constructor
internal_service = InternalService()
# Control the number of objects passed into the service call via process_dataset
process_dataset(dataset, internal_service, batch_size=100)
ray.shutdown()
Key Benefits
Seamless Integration: Works with any Python library that accepts Pydantic objects, making it easy to integrate with modern codebases.
Built-in Validation: Pydantic catches data quality issues before they reach your business logic.
Resource Efficiency: Processes data in configurable batches, preventing memory issues with large datasets.
Error Resilience: Invalid rows are filtered out automatically, keeping the pipeline running.
When to Use This Pattern
Perfect for:
- Large DataFrames (10K+ rows) feeding into Pydantic-based libraries
- In-house services that expect validated, structured objects
- Data pipelines with multiple downstream consumers
- Scenarios where data validation is critical
Not ideal for:
- Small datasets (<5K rows) where startup overhead dominates
- Libraries that work directly with DataFrames
- Simple transformations without validation needs
Conclusion
This pattern bridges the gap between DataFrame-based data processing and modern Python libraries that expect structured objects, giving you type-safe interactions with Python libraries while controlling the amount of data submitted per request and limiting the rate at which data is sent overall.