Building a Ray Data Integration for Apache Kafka with confluent-kafka
Streaming data to Kafka from distributed data processing pipelines is a common pattern, but the integration details matter. This post examines a Ray Data sink for Kafka built on confluent-kafka, leveraging the high-performance librdkafka library for efficient distributed writes.
The Problem
Writing Ray Datasets to Kafka presents several challenges that don’t align cleanly with typical batch processing patterns:
- Converting DataFrames to Kafka message formats (key-value pairs)
- Choosing appropriate serialization strategies (JSON, strings, bytes)
- Managing asynchronous delivery and callbacks
- Handling producer buffer overflow in high-throughput scenarios
- Batching efficiently without blocking on network I/O
- Managing message keys for partitioning and compaction
Without careful design, you end up with inefficient writes, memory issues, or silent data loss.
Why confluent-kafka?
The integration uses Confluent’s official Python client, which wraps the high-performance librdkafka C library. This provides significant advantages over pure-Python implementations:
Performance: librdkafka achieves higher throughput
Better Compression: Hardware-accelerated compression with superior ratios
Production-Ready: Battle-tested in high-scale deployments with robust error handling
Architecture: Datasink Pattern
The integration implements a custom KafkaDatasink that bridges Ray’s distributed execution model with Kafka’s producer API:
Ray Dataset → KafkaDatasink → confluent-kafka Producer → Kafka Broker
Each Ray task gets its own Kafka producer instance to avoid shared state and connection pool contention.
KafkaDatasink: Core Implementation
The datasink handles configuration, serialization, and asynchronous writes:
class KafkaDatasink(Datasink):
def __init__(
self,
topic: str,
bootstrap_servers: str,
key_field: str | None = None,
value_serializer: str = "json",
producer_config: dict[str, Any] | None = None,
batch_size: int = 100,
delivery_callback: Callable | None = None,
):
Key Implementation Details
Per-Task Producer Initialization: Each Ray task creates its own Producer with librdkafka configuration:
def write(
self,
blocks: Iterable[Block],
ctx: TaskContext,
) -> Any:
# Configure producer with librdkafka settings
config = {"bootstrap.servers": self.bootstrap_servers, **self.producer_config}
producer = Producer(config)
total_records = 0
batch_count = 0
Note the dotted-key format (bootstrap.servers) used by librdkafka, different from Python-style underscores.
Explicit Serialization: Unlike kafka-python’s built-in serializers, confluent-kafka expects raw bytes:
def _serialize_value(self, value: Any) -> bytes:
"""Serialize value based on configured format."""
# Convert ArrowRow to dict first
value = self._row_to_dict(value)
if self.value_serializer == "json":
return json.dumps(value).encode("utf-8")
elif self.value_serializer == "string":
return str(value).encode("utf-8")
else: # bytes
return value if isinstance(value, bytes) else str(value).encode("utf-8")
This approach gives you full control over serialization logic while keeping the producer lightweight.
Asynchronous Delivery with Callbacks: The produce() method is asynchronous and non-blocking:
# Use provided callback or default
callback = self.delivery_callback or self._default_delivery_callback
for block in blocks:
block_accessor = BlockAccessor.for_block(block)
# Iterate through rows in block
for row in block_accessor.iter_rows(public_row_format=False):
# Extract key if specified
key = self._extract_key(row)
# Serialize value
value = self._serialize_value(row)
# Produce to Kafka
producer.produce(topic=self.topic, value=value, key=key, callback=callback)
The default callback raises exceptions on delivery failure:
def _default_delivery_callback(self, err: KafkaError | None, msg: Any) -> None:
"""Default delivery report callback."""
if err is not None:
raise KafkaException(f"Message delivery failed: {err}")
This ensures you catch failures rather than silently dropping messages.
Buffer Management: When the internal queue fills, confluent-kafka raises BufferError. The sink handles this gracefully:
try:
producer.produce(topic=self.topic, value=value, key=key, callback=callback)
total_records += 1
batch_count += 1
except BufferError:
# Queue is full, wait for messages to be delivered
producer.poll(1.0)
# Retry
producer.produce(topic=self.topic, value=value, key=key, callback=callback)
total_records += 1
batch_count += 1
This back-pressure mechanism prevents memory overflow while maintaining throughput.
Polling Strategy: Unlike kafka-python’s automatic background thread, confluent-kafka requires explicit polling to trigger callbacks:
# Poll periodically for delivery reports
if batch_count >= self.batch_size:
producer.poll(0) # Non-blocking poll
batch_count = 0
# Final flush ensures all messages are sent
remaining = producer.flush(timeout=30.0)
if remaining > 0:
failed_messages = remaining
Periodic polling (every N messages) triggers delivery callbacks without blocking on I/O, keeping the pipeline efficient.
Key Extraction for Partitioning: Kafka uses message keys for partitioning and compaction:
def _extract_key(self, row: Any) -> bytes | None:
"""Extract and encode message key from row."""
# Convert ArrowRow to dict first
row_dict = self._row_to_dict(row)
key = None
if self.key_field and isinstance(row_dict, dict):
key_value = row_dict.get(self.key_field)
if key_value is not None:
key = str(key_value).encode("utf-8")
return key
This enables:
- Consistent routing to the same partition for a given key
- Log compaction (Kafka keeps only the latest message per key)
- Ordered processing within a partition
Error Handling and Cleanup: The implementation ensures cleanup even on failure:
try:
# Write logic...
except KafkaException as e:
raise RuntimeError(f"Failed to write to Kafka: {e}") from e
finally:
# Flush any remaining messages
producer.flush()
return {"total_records": total_records, "failed_messages": failed_messages}
The final flush is critical as it blocks until all queued messages are acknowledged.
Convenience Helper
For simpler usage, a helper function wraps the datasink:
def write_kafka(
dataset: Dataset,
topic: str,
bootstrap_servers: str,
key_field: str | None = None,
value_serializer: str = "json",
producer_config: dict[str, Any] | None = None,
batch_size: int = 100,
delivery_callback: Callable | None = None,
) -> Any:
sink = KafkaDatasink(...)
return dataset.write_datasink(sink)
This allows one-line writes:
write_kafka(ds, "my-topic", "localhost:9092", key_field="user_id")
Docker-Based Testing
Testing Kafka integrations traditionally requires complex local setup. The included Docker Compose configuration solves this:
services:
kafka:
image: confluentinc/cp-kafka:latest
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_KRAFT_MODE: "true"
KAFKA_PROCESS_ROLES: controller,broker
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9093"
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: "Mk3OEYBSD34fcwNTJENDM2Qk"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 10s
timeout: 10s
retries: 10
unit-tests:
build:
context: .
dockerfile: Dockerfile.test
depends_on:
kafka:
condition: service_healthy
command: pytest tests/unit/ -v --color=yes
profiles:
- test
integration-tests:
build:
context: .
dockerfile: Dockerfile.test
depends_on:
kafka:
condition: service_healthy
environment:
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
command: python integration_tests.py
profiles:
- integration
Two-Tier Testing Strategy
Unit Tests (Mocked): Fast tests using mocked producers. No actual Kafka needed:
@patch("src.kafka_datasink.Producer")
@patch("src.kafka_datasink.BlockAccessor")
def test_write_with_keys(self, mock_accessor_class, mock_producer_class):
mock_producer = MagicMock()
mock_producer.flush.return_value = 0
mock_producer_class.return_value = mock_producer
mock_accessor = MagicMock()
mock_accessor.iter_rows.return_value = [
{"id": 1, "value": "foo"},
{"id": 2, "value": "bar"},
]
mock_accessor_class.for_block.return_value = mock_accessor
sink = KafkaDatasink(
topic="test-topic",
bootstrap_servers="localhost:9092",
key_field="id",
)
_ = sink.write([Mock()], Mock())
# Verify keys were passed
for call_args in mock_producer.produce.call_args_list:
assert call_args[1]["key"] is not None
assert isinstance(call_args[1]["key"], bytes)
Integration Tests (Real Kafka): End-to-end validation with actual broker:
def test_basic_write():
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
topic = "test-basic-write"
setup_topic(topic, bootstrap_servers)
ray.init(ignore_reinit_error=True)
try:
data = [
{"id": 1, "name": "Alice", "score": 95},
{"id": 2, "name": "Bob", "score": 87},
{"id": 3, "name": "Charlie", "score": 92},
]
ds = ray.data.from_items(data)
write_kafka(
dataset=ds,
topic=topic,
bootstrap_servers=bootstrap_servers,
value_serializer="json",
)
# Consume and verify
time.sleep(2)
messages = consume_messages(topic, bootstrap_servers, len(data))
assert len(messages) == len(data)
finally:
delete_topic(topic, bootstrap_servers)
The integration tests use confluent-kafka’s Consumer for verification:
def consume_messages(
topic: str,
bootstrap_servers: str,
expected_count: int,
timeout: int = 30,
value_deserializer: str = "json",
) -> list[dict[str, Any]]:
"""Consume messages from Kafka topic."""
conf = {
"bootstrap.servers": bootstrap_servers,
"group.id": f"test-consumer-{int(time.time())}",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
}
consumer = Consumer(conf)
consumer.subscribe([topic])
messages = []
start_time = time.time()
try:
while len(messages) < expected_count:
if time.time() - start_time > timeout:
break
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
break
if value_deserializer == "json":
value = json.loads(msg.value().decode("utf-8"))
else:
value = msg.value().decode("utf-8")
messages.append({
"value": value,
"key": msg.key().decode("utf-8") if msg.key() else None,
"partition": msg.partition(),
"offset": msg.offset(),
})
finally:
consumer.close()
return messages
Makefile Convenience
# Run unit tests
make unit
# Run integration tests
make integration
Usage Examples
Basic Write with JSON Serialization
import ray
ds = ray.data.range(1000).map(lambda x: {
"id": x["id"],
"value": x["id"] * 2,
"category": "even" if x["id"] % 2 == 0 else "odd"
})
write_kafka(
dataset=ds,
topic="processed-data",
bootstrap_servers="localhost:9092",
)
Write with Partitioning Keys
# User events with consistent routing per user
user_events = ray.data.from_items([
{"user_id": "alice", "action": "login", "timestamp": 1234567890},
{"user_id": "bob", "action": "purchase", "timestamp": 1234567891},
{"user_id": "alice", "action": "logout", "timestamp": 1234567892},
])
write_kafka(
dataset=user_events,
topic="user-events",
bootstrap_servers="kafka-broker:9092",
key_field="user_id", # Alice's events → same partition
)
Custom Delivery Callback
Track successful and failed deliveries:
delivery_stats = {"success": 0, "failed": 0}
def track_delivery(err, msg):
if err:
delivery_stats["failed"] += 1
print(f"Delivery failed: {err}")
else:
delivery_stats["success"] += 1
write_kafka(
dataset=ds,
topic="tracked-events",
bootstrap_servers="localhost:9092",
delivery_callback=track_delivery,
)
print(f"Delivered: {delivery_stats['success']}, Failed: {delivery_stats['failed']}")
High-Throughput Configuration
write_kafka(
dataset=large_dataset,
topic="high-volume",
bootstrap_servers="broker1:9092,broker2:9092,broker3:9092",
key_field="entity_id",
batch_size=500,
producer_config={
"acks": "1", # Leader acknowledgment only
"compression.type": "lz4", # Fast compression
"linger.ms": 100, # Batch for 100ms
"batch.size": 1000000, # 1MB batches
"queue.buffering.max.messages": 100000,
},
)
High-Reliability Configuration
write_kafka(
dataset=critical_data,
topic="critical-events",
bootstrap_servers="broker1:9092,broker2:9092,broker3:9092",
producer_config={
"acks": "all", # Wait for all replicas
"enable.idempotence": True, # Exactly-once semantics
"max.in.flight.requests.per.connection": 1,
"retries": 10,
"retry.backoff.ms": 100,
},
)
Production Considerations
Resource Configuration
ds.write_datasink(
sink,
ray_remote_args={
"num_cpus": 0.5, # Half CPU per task
"memory": 100 * 1024 * 1024, # 100MB per task
},
concurrency=10, # 10 parallel writers
)
Concurrency tuning:
- Start with 5-10 concurrent writers
- Monitor broker CPU and network
- Scale up until broker saturation
Monitoring Metrics
Track write statistics returned from the sink:
result = write_kafka(ds, topic, bootstrap_servers)
# {'total_records': 50000, 'failed_messages': 0}
Key librdkafka metrics (via statistics callback):
def stats_callback(stats_json):
stats = json.loads(stats_json)
print(f"Messages buffered: {stats['msg_cnt']}")
print(f"Messages sent: {stats['txmsgs']}")
print(f"Errors: {stats['txerrs']}")
producer_config={
"statistics.interval.ms": 5000,
"stats_cb": stats_callback,
}
Broker-side metrics (JMX):
MessagesInPerSec: Incoming message rateBytesInPerSec: Incoming byte rateRequestsPerSec: Request rateProduceRequestsPerSec: Producer-specific requests
Error Handling Strategies
Retry with backoff:
producer_config={
"retries": 10,
"retry.backoff.ms": 100,
"retry.backoff.max.ms": 1000,
}
Dead Letter Queue: Route failed messages to a separate topic:
def dlq_callback(err, msg):
if err:
# Write to DLQ topic
dlq_producer.produce("dlq-topic", value=msg.value(), key=msg.key())
write_kafka(ds, "primary-topic", bootstrap_servers, delivery_callback=dlq_callback)
Idempotent production (exactly-once):
producer_config={
"enable.idempotence": True,
"max.in.flight.requests.per.connection": 5,
"acks": "all",
}
This ensures duplicate messages aren’t written even with retries.
Security Configuration
TLS/SSL encryption:
producer_config={
"security.protocol": "SSL",
"ssl.ca.location": "/path/to/ca-cert.pem",
"ssl.certificate.location": "/path/to/client-cert.pem",
"ssl.key.location": "/path/to/client-key.pem",
}
SASL authentication:
producer_config={
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "your-username",
"sasl.password": "your-password",
}
Key Takeaways
Building Ray integrations for Kafka with confluent-kafka requires:
- Asynchronous Design: Leverage
produce()non-blocking calls with delivery callbacks for maximum throughput - Buffer Management: Handle
BufferErrorgracefully with polling and retry logic - Explicit Polling: Call
poll()periodically to trigger callbacks and process delivery reports - Smart Batching: Balance throughput and latency with configurable batch sizes and
linger.ms - Key Management: Extract message keys from data for proper partitioning and compaction
- Robust Testing: Use Docker Compose for reproducible integration tests alongside fast unit tests
- Production Configuration: Tune librdkafka parameters for reliability vs. performance trade-offs
- Native Performance: Achieve better throughput using librdkafka’s optimized C implementation vs Python only implementations
The combination of confluent-kafka’s high-performance client, careful buffer management, and proper asynchronous handling enables reliable, efficient Kafka integration with Ray Data at production scale.