Interfacing Ray with internal libraries and services using Pydantic

Companies often develop internal libraries that expose inputs and outputs to functions via Pydantic. Whether your internal library is for machine learning, API clients, or data validation services, you’ll often need to convert large DataFrames into Pydantic objects and pass batches of them to libraries.

Here’s how to do it efficiently using Ray Data.

The Challenge

Many in-house Python libraries/services follow these patterns:

ml_service.predict_batch(records: List[PredictionRequest]) -> List[PredictionResponse]
validator.validate_batch(data: List[DataModel]) -> List[DataValidation]
api_client.submit_batch(objects: List[APIRecord]) -> List[APIResponses]

But your data starts as DataFrames. This approach doesn’t scale if there’s any latency in the sequential processing of a batch:

# This breaks with large datasets
objects = [DataRecord(**row.to_dict()) for _, row in df.iterrows()]
library.process_batch(objects)

The Solution: Ray Data + Actors

Ray Data’s map_batches with actors can make this easy. Here’s the complete pattern:

import ray
import ray.data
import pandas as pd
from ray.data import Dataset
from pydantic import BaseModel
from typing import List

class DataRecord(BaseModel):
    id: str
    value: float
    category: str

class InternalService:
    """Represents your in-house service that expects Pydantic objects."""
    def process_batch(self, records: List[DataRecord]) -> None:
        print(f"Processing {len(records)} validated records")
        # Your internal logic here

@ray.remote
class DataProcessor:
    def __init__(self, service: InternalService):
        self.service = service
    
    def process_batch(self, batch_df: pd.DataFrame) -> pd.DataFrame:
        # Convert to Pydantic objects
        objects = []
        for _, row in batch_df.iterrows():
            try:
                obj = DataRecord(**row.to_dict())
                objects.append(obj)
            except Exception:
                continue  # Skip invalid rows
        
        # Submit to your service
        if objects:
            self.service.process_batch(objects)
        
        return pd.DataFrame({'processed': [len(objects)]})

def process_dataset(dataset: Dataset, service: InternalService, batch_size: int = 1000):
    """Submit to library in parallel."""
    result_dataset = dataset.map_batches(
        DataProcessor,
        batch_size=batch_size,
        batch_format="pandas",
        fn_constructor_args=[service]
    )
    
    # Execute pipeline
    total = sum(batch['processed'].sum() 
                for batch in result_dataset.iter_batches(batch_format="pandas"))
    print(f"Processed {total} records")

Why This Works for Internal Libraries and Services

Type Safety: Pydantic validation ensures your internal libraries receive exactly the data they expect, catching issues early in the pipeline rather than deep in business logic.

Interoperability: Many modern Python libraries use Pydantic for their APIs. This pattern lets you seamlessly integrate with them without manual serialization.

Stateful Processing: Ray actors maintain library connections and state across batches - perfect for authenticated clients, ML models, or database connections.

Parallel Execution: Ray automatically distributes work across cores, turning single-threaded library calls into parallel processing.

Real-World Example

import ray
import ray.data
import pandas as pd

ray.init()

import numpy as np

# Limit for demo purposes
limit = 10

# Transform DataFrame into a Ray dataset
dataset = ray.data.from_pandas(pd.DataFrame({
   'id': [f'record_{i:06d}' for i in range(500_000)],
   'value': np.random.normal(100, 15, 500_000),
   'category': np.random.choice(['A', 'B', 'C', 'D'], 500_000),
})).limit(limit)

# Wire up InternalService to be used by the actors
# This service may need to be wired inside the actor using configuration
# Passed into the constructor
internal_service = InternalService()

# Control the number of objects passed into the service call via process_dataset

process_dataset(dataset, internal_service, batch_size=100)

ray.shutdown()

Key Benefits

Seamless Integration: Works with any Python library that accepts Pydantic objects, making it easy to integrate with modern codebases.

Built-in Validation: Pydantic catches data quality issues before they reach your business logic.

Resource Efficiency: Processes data in configurable batches, preventing memory issues with large datasets.

Error Resilience: Invalid rows are filtered out automatically, keeping the pipeline running.

When to Use This Pattern

Perfect for:

Not ideal for:

Conclusion

This pattern bridges the gap between DataFrame-based data processing and modern Python libraries that expect structured objects, giving you type-safe interactions with Python libraries while controlling the amount of data submitted per request and limiting the rate at which data is sent overall.