Building Ray Data Integrations for Vector Databases
Vector databases like Turbopuffer are becoming essential for AI applications, but integrating them can be a challenge if careful considerations around batching and performance aren’t made. This post examines an integration that handles the details of distributed vector operations.
The Problem
Vector databases need specific data formats and batch strategies that may not align well with DataFrame operations. You end up dealing with:
- Converting between DataFrames and vector formats
- Optimizing batch sizes for network efficiency
- Handling failures in distributed environments
- Managing vector arrays and metadata together
Architecture: Datasource and Datasink
The Ray-Turbopuffer integration implements custom TurbopufferDatasource
and TurbopufferDatasink
classes that handle bidirectional data flow:
Ray Dataset → TurbopufferDatasink → Turbopuffer
Turbopuffer → TurbopufferDatasource → Ray Dataset
TurbopufferDatasink: Writing Data
The datasink handles distributed writes with configurable batching:
class TurbopufferDatasink(Datasink):
def __init__(
self,
namespace: str,
batch_size: int = 1000,
transform_fn: Optional[Callable[[pd.DataFrame], pd.DataFrame]] = None,
upsert_mode: str = "upsert",
# ... other params
):
Key Implementation Details
Per-Task Client Initialization: Each Ray task gets its own Turbopuffer client to avoid shared state issues.
def write(self, blocks: Iterable[Block], ctx: TaskContext) -> Dict[str, Any]:
client = Turbopuffer(api_key=self.api_key, region=self.region)
for block in blocks:
df = block_accessor.to_pandas()
if self.transform_fn:
df = self.transform_fn(df)
# Process in API-sized batches
for i in range(0, len(df), self.batch_size):
batch_df = df.iloc[i : i + self.batch_size]
self._write_batch(client, batch_df, has_vectors)
Data Format Conversion: The _write_batch
method handles the conversion from pandas to Turbopuffer’s API format:
def _write_batch(self, client: Turbopuffer, batch_df: pd.DataFrame, has_vectors: bool):
write_data = {"id": batch_df[self.id_column].tolist()}
# Convert vectors (numpy arrays → lists)
if has_vectors:
vectors = []
for vector in batch_df[self.vector_column]:
if hasattr(vector, "tolist"):
vectors.append(vector.tolist())
else:
vectors.append(vector)
write_data["vector"] = vectors
# Handle attributes, converting NaN to None
for col in attribute_columns:
values = batch_df[col].where(pd.notna(batch_df[col]), None).tolist()
write_data[col] = values
Error Handling: Batch-level isolation means one failed batch doesn’t kill the entire job (further customization can be made to alter these semantics if desired):
try:
self._write_batch(client, batch_df, has_vectors)
task_rows_written += len(batch_df)
except Exception as e:
error_msg = f"Error writing batch {task_batches_written}: {str(e)}"
task_errors.append(error_msg)
# Continue with next batch
TurbopufferDatasource: Reading Data
The datasource is query-centric rather than file-based:
class TurbopufferDatasource(Datasource):
def __init__(
self,
namespace: str,
query_vector: Optional[List[float]] = None,
filters: Optional[Union[List, Dict]] = None,
rank_by: Optional[List[str]] = None,
top_k: int = 1000,
):
This supports different query patterns:
- Vector similarity:
rank_by=("vector", "ANN", query_vector)
- Filtered queries:
filters=["category", "Eq", "tech"]
- Hybrid search: Combine both
Size Estimation
The datasource estimates result size for Ray’s optimizer (this can be further enhanced with data that’s returned from the Turbopuffer API, which I plan to do as a follow on PR):
def estimate_inmemory_data_size(self) -> Optional[int]:
# Sample query to get namespace stats
sample_query = self._build_query_params(top_k=1)
response = client.namespace(self.namespace).query(**sample_query)
if hasattr(response, "stats"):
namespace_size = response.stats.approx_namespace_size
expected_results = min(self.top_k, namespace_size)
return expected_results * 1024 # Rough estimate per doc
Result Processing
Converting Turbopuffer results back to DataFrames handles dynamic schemas:
def _documents_to_dataframe(self, documents: List[Row]) -> pd.DataFrame:
# Get all unique keys across documents
all_keys = set()
for doc in documents:
all_keys.update(doc.dict().keys())
# Build consistent DataFrame
data = {}
for key in sorted(all_keys):
values = [doc.dict().get(key) for doc in documents]
data[key] = values
df = pd.DataFrame(data)
# Clean up special fields like distance scores
return df.rename(columns={"$dist": "distance"})
Usage Examples
Writing with Data Transformation
def enrich_data(df: pd.DataFrame) -> pd.DataFrame:
df["text_length"] = df["text"].str.len()
df["priority"] = df["source"].map({"news": 1, "blogs": 2})
return df
datasink = TurbopufferDatasink(
namespace="documents",
batch_size=100,
transform_fn=enrich_data,
)
ds.write_datasink(datasink, concurrency=10)
Reading with Hybrid Search
# Vector similarity + attribute filtering
datasource = TurbopufferDatasource(
namespace="documents",
filters=["And", [["priority", "Lte", 2]]],
rank_by=("vector", "ANN", [0.1, 0.2, 0.3]),
top_k=100,
)
results = ray.data.read_datasource(datasource)
Performance Characteristics
Batch Size Tuning: Optimal batches are 100-1000 documents depending on vector size. Larger vectors need smaller batches.
Concurrency: Linear scaling up to ~10 concurrent writers before hitting API limits.
Memory: Constant memory per task regardless of dataset size due to streaming processing.
Query Latency: Typical 50-200ms for similarity search, much faster for attribute-only queries.
Production Considerations
Resource Configuration
ds.write_datasink(
datasink,
ray_remote_args={"num_cpus": 0.5, "memory": 1000 * 1024 * 1024},
concurrency=10,
)
Error Reporting
The implementation tracks detailed statistics:
{
"rows_written": 50000,
"batches_written": 50,
"error_count": 2,
"errors": ["Error writing batch 23: Connection timeout"]
}
Key Takeaways
Building Ray integrations for external services requires:
- Task isolation: Each Ray task operates independently with its own client
- Efficient data conversion: Minimize overhead between DataFrame and API formats
- Batch-level error handling: Isolate failures to maintain overall job progress
- Configurable performance: Allow runtime tuning of batch sizes and concurrency
- Dynamic schema support: Handle varying document structures gracefully