Building a Ray Data Integration for Apache Kafka with confluent-kafka

Streaming data to Kafka from distributed data processing pipelines is a common pattern, but the integration details matter. This post examines a Ray Data sink for Kafka built on confluent-kafka, leveraging the high-performance librdkafka library for efficient distributed writes.

The Problem

Writing Ray Datasets to Kafka presents several challenges that don’t align cleanly with typical batch processing patterns:

Without careful design, you end up with inefficient writes, memory issues, or silent data loss.

Why confluent-kafka?

The integration uses Confluent’s official Python client, which wraps the high-performance librdkafka C library. This provides significant advantages over pure-Python implementations:

Performance: librdkafka achieves higher throughput

Better Compression: Hardware-accelerated compression with superior ratios

Production-Ready: Battle-tested in high-scale deployments with robust error handling

Architecture: Datasink Pattern

The integration implements a custom KafkaDatasink that bridges Ray’s distributed execution model with Kafka’s producer API:

Ray Dataset → KafkaDatasink → confluent-kafka Producer → Kafka Broker

Each Ray task gets its own Kafka producer instance to avoid shared state and connection pool contention.

KafkaDatasink: Core Implementation

The datasink handles configuration, serialization, and asynchronous writes:

class KafkaDatasink(Datasink):
    def __init__(
        self,
        topic: str,
        bootstrap_servers: str,
        key_field: str | None = None,
        value_serializer: str = "json",
        producer_config: dict[str, Any] | None = None,
        batch_size: int = 100,
        delivery_callback: Callable | None = None,
    ):

Key Implementation Details

Per-Task Producer Initialization: Each Ray task creates its own Producer with librdkafka configuration:

def write(
    self,
    blocks: Iterable[Block],
    ctx: TaskContext,
) -> Any:
    # Configure producer with librdkafka settings
    config = {"bootstrap.servers": self.bootstrap_servers, **self.producer_config}

    producer = Producer(config)
    total_records = 0
    batch_count = 0

Note the dotted-key format (bootstrap.servers) used by librdkafka, different from Python-style underscores.

Explicit Serialization: Unlike kafka-python’s built-in serializers, confluent-kafka expects raw bytes:

def _serialize_value(self, value: Any) -> bytes:
    """Serialize value based on configured format."""
    # Convert ArrowRow to dict first
    value = self._row_to_dict(value)

    if self.value_serializer == "json":
        return json.dumps(value).encode("utf-8")
    elif self.value_serializer == "string":
        return str(value).encode("utf-8")
    else:  # bytes
        return value if isinstance(value, bytes) else str(value).encode("utf-8")

This approach gives you full control over serialization logic while keeping the producer lightweight.

Asynchronous Delivery with Callbacks: The produce() method is asynchronous and non-blocking:

# Use provided callback or default
callback = self.delivery_callback or self._default_delivery_callback

for block in blocks:
    block_accessor = BlockAccessor.for_block(block)

    # Iterate through rows in block
    for row in block_accessor.iter_rows(public_row_format=False):
        # Extract key if specified
        key = self._extract_key(row)

        # Serialize value
        value = self._serialize_value(row)

        # Produce to Kafka
        producer.produce(topic=self.topic, value=value, key=key, callback=callback)

The default callback raises exceptions on delivery failure:

def _default_delivery_callback(self, err: KafkaError | None, msg: Any) -> None:
    """Default delivery report callback."""
    if err is not None:
        raise KafkaException(f"Message delivery failed: {err}")

This ensures you catch failures rather than silently dropping messages.

Buffer Management: When the internal queue fills, confluent-kafka raises BufferError. The sink handles this gracefully:

try:
    producer.produce(topic=self.topic, value=value, key=key, callback=callback)
    total_records += 1
    batch_count += 1

except BufferError:
    # Queue is full, wait for messages to be delivered
    producer.poll(1.0)
    # Retry
    producer.produce(topic=self.topic, value=value, key=key, callback=callback)
    total_records += 1
    batch_count += 1

This back-pressure mechanism prevents memory overflow while maintaining throughput.

Polling Strategy: Unlike kafka-python’s automatic background thread, confluent-kafka requires explicit polling to trigger callbacks:

# Poll periodically for delivery reports
if batch_count >= self.batch_size:
    producer.poll(0)  # Non-blocking poll
    batch_count = 0

# Final flush ensures all messages are sent
remaining = producer.flush(timeout=30.0)
if remaining > 0:
    failed_messages = remaining

Periodic polling (every N messages) triggers delivery callbacks without blocking on I/O, keeping the pipeline efficient.

Key Extraction for Partitioning: Kafka uses message keys for partitioning and compaction:

def _extract_key(self, row: Any) -> bytes | None:
    """Extract and encode message key from row."""
    # Convert ArrowRow to dict first
    row_dict = self._row_to_dict(row)

    key = None
    if self.key_field and isinstance(row_dict, dict):
        key_value = row_dict.get(self.key_field)
        if key_value is not None:
            key = str(key_value).encode("utf-8")
    return key

This enables:

Error Handling and Cleanup: The implementation ensures cleanup even on failure:

try:
    # Write logic...
except KafkaException as e:
    raise RuntimeError(f"Failed to write to Kafka: {e}") from e
finally:
    # Flush any remaining messages
    producer.flush()

return {"total_records": total_records, "failed_messages": failed_messages}

The final flush is critical as it blocks until all queued messages are acknowledged.

Convenience Helper

For simpler usage, a helper function wraps the datasink:

def write_kafka(
    dataset: Dataset,
    topic: str,
    bootstrap_servers: str,
    key_field: str | None = None,
    value_serializer: str = "json",
    producer_config: dict[str, Any] | None = None,
    batch_size: int = 100,
    delivery_callback: Callable | None = None,
) -> Any:
    sink = KafkaDatasink(...)
    return dataset.write_datasink(sink)

This allows one-line writes:

write_kafka(ds, "my-topic", "localhost:9092", key_field="user_id")

Docker-Based Testing

Testing Kafka integrations traditionally requires complex local setup. The included Docker Compose configuration solves this:

services:
  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_KRAFT_MODE: "true"
      KAFKA_PROCESS_ROLES: controller,broker
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9093"
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CLUSTER_ID: "Mk3OEYBSD34fcwNTJENDM2Qk"
    healthcheck:
      test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
      interval: 10s
      timeout: 10s
      retries: 10

  unit-tests:
    build:
      context: .
      dockerfile: Dockerfile.test
    depends_on:
      kafka:
        condition: service_healthy
    command: pytest tests/unit/ -v --color=yes
    profiles:
      - test

  integration-tests:
    build:
      context: .
      dockerfile: Dockerfile.test
    depends_on:
      kafka:
        condition: service_healthy
    environment:
      KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
    command: python integration_tests.py
    profiles:
      - integration

Two-Tier Testing Strategy

Unit Tests (Mocked): Fast tests using mocked producers. No actual Kafka needed:

@patch("src.kafka_datasink.Producer")
@patch("src.kafka_datasink.BlockAccessor")
def test_write_with_keys(self, mock_accessor_class, mock_producer_class):
    mock_producer = MagicMock()
    mock_producer.flush.return_value = 0
    mock_producer_class.return_value = mock_producer

    mock_accessor = MagicMock()
    mock_accessor.iter_rows.return_value = [
        {"id": 1, "value": "foo"},
        {"id": 2, "value": "bar"},
    ]
    mock_accessor_class.for_block.return_value = mock_accessor

    sink = KafkaDatasink(
        topic="test-topic",
        bootstrap_servers="localhost:9092",
        key_field="id",
    )

    _ = sink.write([Mock()], Mock())

    # Verify keys were passed
    for call_args in mock_producer.produce.call_args_list:
        assert call_args[1]["key"] is not None
        assert isinstance(call_args[1]["key"], bytes)

Integration Tests (Real Kafka): End-to-end validation with actual broker:

def test_basic_write():
    bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
    topic = "test-basic-write"

    setup_topic(topic, bootstrap_servers)
    ray.init(ignore_reinit_error=True)

    try:
        data = [
            {"id": 1, "name": "Alice", "score": 95},
            {"id": 2, "name": "Bob", "score": 87},
            {"id": 3, "name": "Charlie", "score": 92},
        ]
        ds = ray.data.from_items(data)

        write_kafka(
            dataset=ds,
            topic=topic,
            bootstrap_servers=bootstrap_servers,
            value_serializer="json",
        )

        # Consume and verify
        time.sleep(2)
        messages = consume_messages(topic, bootstrap_servers, len(data))
        assert len(messages) == len(data)

    finally:
        delete_topic(topic, bootstrap_servers)

The integration tests use confluent-kafka’s Consumer for verification:

def consume_messages(
    topic: str,
    bootstrap_servers: str,
    expected_count: int,
    timeout: int = 30,
    value_deserializer: str = "json",
) -> list[dict[str, Any]]:
    """Consume messages from Kafka topic."""
    conf = {
        "bootstrap.servers": bootstrap_servers,
        "group.id": f"test-consumer-{int(time.time())}",
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
    }

    consumer = Consumer(conf)
    consumer.subscribe([topic])

    messages = []
    start_time = time.time()

    try:
        while len(messages) < expected_count:
            if time.time() - start_time > timeout:
                break

            msg = consumer.poll(timeout=1.0)

            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    break

            if value_deserializer == "json":
                value = json.loads(msg.value().decode("utf-8"))
            else:
                value = msg.value().decode("utf-8")

            messages.append({
                "value": value,
                "key": msg.key().decode("utf-8") if msg.key() else None,
                "partition": msg.partition(),
                "offset": msg.offset(),
            })

    finally:
        consumer.close()

    return messages

Makefile Convenience

# Run unit tests
make unit

# Run integration tests
make integration

Usage Examples

Basic Write with JSON Serialization

import ray

ds = ray.data.range(1000).map(lambda x: {
    "id": x["id"],
    "value": x["id"] * 2,
    "category": "even" if x["id"] % 2 == 0 else "odd"
})

write_kafka(
    dataset=ds,
    topic="processed-data",
    bootstrap_servers="localhost:9092",
)

Write with Partitioning Keys

# User events with consistent routing per user
user_events = ray.data.from_items([
    {"user_id": "alice", "action": "login", "timestamp": 1234567890},
    {"user_id": "bob", "action": "purchase", "timestamp": 1234567891},
    {"user_id": "alice", "action": "logout", "timestamp": 1234567892},
])

write_kafka(
    dataset=user_events,
    topic="user-events",
    bootstrap_servers="kafka-broker:9092",
    key_field="user_id",  # Alice's events → same partition
)

Custom Delivery Callback

Track successful and failed deliveries:

delivery_stats = {"success": 0, "failed": 0}

def track_delivery(err, msg):
    if err:
        delivery_stats["failed"] += 1
        print(f"Delivery failed: {err}")
    else:
        delivery_stats["success"] += 1

write_kafka(
    dataset=ds,
    topic="tracked-events",
    bootstrap_servers="localhost:9092",
    delivery_callback=track_delivery,
)

print(f"Delivered: {delivery_stats['success']}, Failed: {delivery_stats['failed']}")

High-Throughput Configuration

write_kafka(
    dataset=large_dataset,
    topic="high-volume",
    bootstrap_servers="broker1:9092,broker2:9092,broker3:9092",
    key_field="entity_id",
    batch_size=500,
    producer_config={
        "acks": "1",                    # Leader acknowledgment only
        "compression.type": "lz4",      # Fast compression
        "linger.ms": 100,               # Batch for 100ms
        "batch.size": 1000000,          # 1MB batches
        "queue.buffering.max.messages": 100000,
    },
)

High-Reliability Configuration

write_kafka(
    dataset=critical_data,
    topic="critical-events",
    bootstrap_servers="broker1:9092,broker2:9092,broker3:9092",
    producer_config={
        "acks": "all",                  # Wait for all replicas
        "enable.idempotence": True,     # Exactly-once semantics
        "max.in.flight.requests.per.connection": 1,
        "retries": 10,
        "retry.backoff.ms": 100,
    },
)

Production Considerations

Resource Configuration

ds.write_datasink(
    sink,
    ray_remote_args={
        "num_cpus": 0.5,              # Half CPU per task
        "memory": 100 * 1024 * 1024,  # 100MB per task
    },
    concurrency=10,  # 10 parallel writers
)

Concurrency tuning:

Monitoring Metrics

Track write statistics returned from the sink:

result = write_kafka(ds, topic, bootstrap_servers)
# {'total_records': 50000, 'failed_messages': 0}

Key librdkafka metrics (via statistics callback):

def stats_callback(stats_json):
    stats = json.loads(stats_json)
    print(f"Messages buffered: {stats['msg_cnt']}")
    print(f"Messages sent: {stats['txmsgs']}")
    print(f"Errors: {stats['txerrs']}")

producer_config={
    "statistics.interval.ms": 5000,
    "stats_cb": stats_callback,
}

Broker-side metrics (JMX):

Error Handling Strategies

Retry with backoff:

producer_config={
    "retries": 10,
    "retry.backoff.ms": 100,
    "retry.backoff.max.ms": 1000,
}

Dead Letter Queue: Route failed messages to a separate topic:

def dlq_callback(err, msg):
    if err:
        # Write to DLQ topic
        dlq_producer.produce("dlq-topic", value=msg.value(), key=msg.key())

write_kafka(ds, "primary-topic", bootstrap_servers, delivery_callback=dlq_callback)

Idempotent production (exactly-once):

producer_config={
    "enable.idempotence": True,
    "max.in.flight.requests.per.connection": 5,
    "acks": "all",
}

This ensures duplicate messages aren’t written even with retries.

Security Configuration

TLS/SSL encryption:

producer_config={
    "security.protocol": "SSL",
    "ssl.ca.location": "/path/to/ca-cert.pem",
    "ssl.certificate.location": "/path/to/client-cert.pem",
    "ssl.key.location": "/path/to/client-key.pem",
}

SASL authentication:

producer_config={
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "PLAIN",
    "sasl.username": "your-username",
    "sasl.password": "your-password",
}

Key Takeaways

Building Ray integrations for Kafka with confluent-kafka requires:

  1. Asynchronous Design: Leverage produce() non-blocking calls with delivery callbacks for maximum throughput
  2. Buffer Management: Handle BufferError gracefully with polling and retry logic
  3. Explicit Polling: Call poll() periodically to trigger callbacks and process delivery reports
  4. Smart Batching: Balance throughput and latency with configurable batch sizes and linger.ms
  5. Key Management: Extract message keys from data for proper partitioning and compaction
  6. Robust Testing: Use Docker Compose for reproducible integration tests alongside fast unit tests
  7. Production Configuration: Tune librdkafka parameters for reliability vs. performance trade-offs
  8. Native Performance: Achieve better throughput using librdkafka’s optimized C implementation vs Python only implementations

The combination of confluent-kafka’s high-performance client, careful buffer management, and proper asynchronous handling enables reliable, efficient Kafka integration with Ray Data at production scale.