Building Ray Data Integrations for Vector Databases

Vector databases like Turbopuffer are becoming essential for AI applications, but integrating them can be a challenge if careful considerations around batching and performance aren’t made. This post examines an integration that handles the details of distributed vector operations.

The Problem

Vector databases need specific data formats and batch strategies that may not align well with DataFrame operations. You end up dealing with:

Architecture: Datasource and Datasink

The Ray-Turbopuffer integration implements custom TurbopufferDatasource and TurbopufferDatasink classes that handle bidirectional data flow:

Ray Dataset → TurbopufferDatasink → Turbopuffer
Turbopuffer → TurbopufferDatasource → Ray Dataset

TurbopufferDatasink: Writing Data

The datasink handles distributed writes with configurable batching:

class TurbopufferDatasink(Datasink):
    def __init__(
        self,
        namespace: str,
        batch_size: int = 1000,
        transform_fn: Optional[Callable[[pd.DataFrame], pd.DataFrame]] = None,
        upsert_mode: str = "upsert",
        # ... other params
    ):

Key Implementation Details

Per-Task Client Initialization: Each Ray task gets its own Turbopuffer client to avoid shared state issues.

def write(self, blocks: Iterable[Block], ctx: TaskContext) -> Dict[str, Any]:
    client = Turbopuffer(api_key=self.api_key, region=self.region)
    
    for block in blocks:
        df = block_accessor.to_pandas()
        if self.transform_fn:
            df = self.transform_fn(df)
        
        # Process in API-sized batches
        for i in range(0, len(df), self.batch_size):
            batch_df = df.iloc[i : i + self.batch_size]
            self._write_batch(client, batch_df, has_vectors)

Data Format Conversion: The _write_batch method handles the conversion from pandas to Turbopuffer’s API format:

def _write_batch(self, client: Turbopuffer, batch_df: pd.DataFrame, has_vectors: bool):
    write_data = {"id": batch_df[self.id_column].tolist()}
    
    # Convert vectors (numpy arrays → lists)
    if has_vectors:
        vectors = []
        for vector in batch_df[self.vector_column]:
            if hasattr(vector, "tolist"):
                vectors.append(vector.tolist())
            else:
                vectors.append(vector)
        write_data["vector"] = vectors
    
    # Handle attributes, converting NaN to None
    for col in attribute_columns:
        values = batch_df[col].where(pd.notna(batch_df[col]), None).tolist()
        write_data[col] = values

Error Handling: Batch-level isolation means one failed batch doesn’t kill the entire job (further customization can be made to alter these semantics if desired):

try:
    self._write_batch(client, batch_df, has_vectors)
    task_rows_written += len(batch_df)
except Exception as e:
    error_msg = f"Error writing batch {task_batches_written}: {str(e)}"
    task_errors.append(error_msg)
    # Continue with next batch

TurbopufferDatasource: Reading Data

The datasource is query-centric rather than file-based:

class TurbopufferDatasource(Datasource):
    def __init__(
        self,
        namespace: str,
        query_vector: Optional[List[float]] = None,
        filters: Optional[Union[List, Dict]] = None,
        rank_by: Optional[List[str]] = None,
        top_k: int = 1000,
    ):

This supports different query patterns:

Size Estimation

The datasource estimates result size for Ray’s optimizer (this can be further enhanced with data that’s returned from the Turbopuffer API, which I plan to do as a follow on PR):

def estimate_inmemory_data_size(self) -> Optional[int]:
    # Sample query to get namespace stats
    sample_query = self._build_query_params(top_k=1)
    response = client.namespace(self.namespace).query(**sample_query)
    
    if hasattr(response, "stats"):
        namespace_size = response.stats.approx_namespace_size
        expected_results = min(self.top_k, namespace_size)
        return expected_results * 1024  # Rough estimate per doc

Result Processing

Converting Turbopuffer results back to DataFrames handles dynamic schemas:

def _documents_to_dataframe(self, documents: List[Row]) -> pd.DataFrame:
    # Get all unique keys across documents
    all_keys = set()
    for doc in documents:
        all_keys.update(doc.dict().keys())
    
    # Build consistent DataFrame
    data = {}
    for key in sorted(all_keys):
        values = [doc.dict().get(key) for doc in documents]
        data[key] = values
    
    df = pd.DataFrame(data)
    # Clean up special fields like distance scores
    return df.rename(columns={"$dist": "distance"})

Usage Examples

Writing with Data Transformation

def enrich_data(df: pd.DataFrame) -> pd.DataFrame:
    df["text_length"] = df["text"].str.len()
    df["priority"] = df["source"].map({"news": 1, "blogs": 2})
    return df

datasink = TurbopufferDatasink(
    namespace="documents",
    batch_size=100,
    transform_fn=enrich_data,
)

ds.write_datasink(datasink, concurrency=10)
# Vector similarity + attribute filtering
datasource = TurbopufferDatasource(
    namespace="documents",
    filters=["And", [["priority", "Lte", 2]]],
    rank_by=("vector", "ANN", [0.1, 0.2, 0.3]),
    top_k=100,
)

results = ray.data.read_datasource(datasource)

Performance Characteristics

Batch Size Tuning: Optimal batches are 100-1000 documents depending on vector size. Larger vectors need smaller batches.

Concurrency: Linear scaling up to ~10 concurrent writers before hitting API limits.

Memory: Constant memory per task regardless of dataset size due to streaming processing.

Query Latency: Typical 50-200ms for similarity search, much faster for attribute-only queries.

Production Considerations

Resource Configuration

ds.write_datasink(
    datasink,
    ray_remote_args={"num_cpus": 0.5, "memory": 1000 * 1024 * 1024},
    concurrency=10,
)

Error Reporting

The implementation tracks detailed statistics:

{
    "rows_written": 50000,
    "batches_written": 50,
    "error_count": 2,
    "errors": ["Error writing batch 23: Connection timeout"]
}

Key Takeaways

Building Ray integrations for external services requires:

  1. Task isolation: Each Ray task operates independently with its own client
  2. Efficient data conversion: Minimize overhead between DataFrame and API formats
  3. Batch-level error handling: Isolate failures to maintain overall job progress
  4. Configurable performance: Allow runtime tuning of batch sizes and concurrency
  5. Dynamic schema support: Handle varying document structures gracefully