Kafka & Message Queues

Apache Kafka & Message Queues

Distributed streaming, message queue architectures, and real-world patterns.

Apache Kafka — Core Architecture
What is Kafka?
Apache Kafka is a distributed event streaming platform. It acts as a highly scalable, fault-tolerant message bus. Think of it as a durable commit log where producers append events and consumers read them at their own pace.

Kafka Architecture Overview

Core
KAFKA CLUSTER ┌──────────────────────────────────────────────────────────┐ │ │ │ Topic: "user-events" (3 Partitions, RF=2) │ │ │ │ Partition 0: [msg0][msg1][msg4][msg7] → Broker 1 (Lead) │ │ Broker 2 (Replica)│ │ Partition 1: [msg2][msg5][msg8] → Broker 2 (Lead) │ │ Broker 3 (Replica)│ │ Partition 2: [msg3][msg6][msg9] → Broker 3 (Lead) │ │ Broker 1 (Replica)│ └──────────────────────────────────────────────────────────┘ Producers → write to partitions (key-based or round-robin) Consumers (in Consumer Group) → each partition read by ONE consumer ZooKeeper / KRaft → cluster metadata, leader election

Key Components

  • Broker: Kafka server that stores and serves messages
  • Topic: Named stream of records (like a DB table)
  • Partition: Ordered, immutable log — unit of parallelism
  • Offset: Position of a message in partition (0-indexed)
  • Producer: Publishes records to topics
  • Consumer: Reads records from topics
  • Consumer Group: Set of consumers sharing load
  • Replication Factor: Number of partition copies

Key Properties

  • Durability: Messages persisted to disk (configurable retention)
  • High Throughput: Millions of msgs/sec via sequential I/O, batching
  • Fault Tolerant: Replication across brokers
  • Scalable: Add partitions/brokers independently
  • Ordered: Per-partition ordering guaranteed
  • Replay: Consumers can re-read from any offset
  • Decoupled: Producer/consumer don't know each other

Topics, Partitions & Offsets

Core
ConceptDetail
TopicLogical category of messages. e.g., "order-events", "user-activity"
PartitionPhysical split of topic. Enables parallel consumption. Messages ordered within a partition.
Partition Keyhash(key) % numPartitions. Same key → same partition → ordering guarantee per key
OffsetSequential ID of message in partition. Consumer tracks its offset. Can replay from any offset.
Log RetentionConfigurable by time (7 days) or size (1GB). Can be unlimited for event sourcing.
Replication FactorRF=3 means 3 copies of each partition. Leader handles reads/writes, replicas are standby.
ISRIn-Sync Replicas — replicas that are caught up with leader. Message acked when ISR all confirm.
How Many Partitions?
Start with: max(target throughput / producer throughput, target throughput / consumer throughput). Can increase partitions but cannot decrease. Common: 3, 6, 12, 24 per topic.

Consumer Groups & Rebalancing

Core
Topic "orders" — 4 Partitions Consumer Group A (Order Processor): ├── Consumer 1 → Partition 0, 1 └── Consumer 2 → Partition 2, 3 Consumer Group B (Analytics): ├── Consumer 1 → Partition 0, 1 ├── Consumer 2 → Partition 2 └── Consumer 3 → Partition 3 Adding Consumer to Group A: ├── Consumer 1 → Partition 0 ├── Consumer 2 → Partition 1 ├── Consumer 3 → Partition 2 ← new └── [Partition 3 unassigned until Consumer 4 joins] If consumers > partitions → some consumers are idle!

Offset Management

  • auto.commit.enable=true — commits offsets automatically (at-least-once risk)
  • Manual commit: commitSync() after processing → exactly-once semantics
  • Offsets stored in __consumer_offsets internal topic
  • earliest: start from beginning of topic
  • latest: start from latest messages only

Delivery Semantics

Critical
SemanticBehaviorConfigUse Case
At Most OnceMessages may be lost, never duplicatedacks=0, auto-commit before processingMetrics, logs where loss OK
At Least OnceNo loss, but may be duplicatedacks=all, manual commit after processingMost use cases with idempotent consumers
Exactly OnceNo loss, no duplicationenable.idempotence=true + transactionsFinancial, order processing
Producer acks Setting
acks=0: Fire and forget (fastest, no guarantee) | acks=1: Leader ack (fast, leader failure = loss) | acks=all: All ISR ack (slowest, no loss)

Kafka — Code Examples (Java)

Code
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class MetadataEventProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");              // strongest durability
        props.put("retries", 3);
        props.put("enable.idempotence", "true");  // exactly-once

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // tableId as key → same table's events go to same partition → ordered
        String key   = "table:users";
        String value = "{\"event\":\"schema_change\",\"table\":\"users\",\"ts\":1234567890}";

        ProducerRecord<String, String> record =
            new ProducerRecord<>("metadata-events", key, value);

        // Async with callback
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("Send failed: " + exception.getMessage());
            } else {
                System.out.printf("Sent to partition=%d offset=%d%n",
                    metadata.partition(), metadata.offset());
            }
        });

        producer.flush();
        producer.close();
    }
}
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

public class MetadataEventConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "search-indexer-group");
        props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");    // start from beginning if no offset
        props.put("enable.auto.commit", "false");       // manual commit for at-least-once

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(List.of("metadata-events"));

        try {
            while (true) {
                ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("topic=%s partition=%d offset=%d key=%s value=%s%n",
                        record.topic(), record.partition(), record.offset(),
                        record.key(), record.value());

                    // Process: update search index
                    processMetadataEvent(record.value());
                }

                // Commit after processing all records in batch
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }

    static void processMetadataEvent(String event) {
        // Update Elasticsearch index, notify watchers, etc.
        System.out.println("Indexing: " + event);
    }
}
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;

// Kafka Streams: process and transform streams within Kafka
public class EventAggregator {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,    "event-aggregator");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();

        // Read from input topic
        KStream<String, String> events =
            builder.stream("metadata-events");

        // Filter only schema change events
        KStream<String, String> schemaChanges =
            events.filter((key, value) -> value.contains("schema_change"));

        // Count events per table (tumbling window: 1 hour)
        KTable<Windowed<String>, Long> eventCounts = schemaChanges
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            .count();

        // Write results to output topic
        eventCounts.toStream()
            .map((key, count) -> KeyValue.pair(key.key(), count.toString()))
            .to("schema-change-stats");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Kafka Use Cases at Alation-like Systems

Important
Use CaseKafka RoleTopics
Metadata Change PropagationWhen a table schema changes in source DB, publish event → all consumers (search index, lineage graph, notifications) updatemetadata-changes
Audit LogEvery user action (view, edit, comment on data) streamed to Kafka → audit trail, analyticsuser-activity
Search Index UpdatesIncremental search index updates via change events (CDC from DB → Kafka → Elasticsearch)cdc-events
Data Quality AlertsData quality checks run, publish violations → notification workers send alerts to data ownersquality-alerts
Connector StatusMetadata crawlers publish status → monitoring dashboardsconnector-status
Alation Metadata Pipeline: Source DB (Snowflake/Redshift) ↓ CDC (Debezium) Kafka Topic: "source-metadata" ↓ Kafka Consumers: ├── Search Indexer → Elasticsearch ├── Lineage Updater → Graph DB (Neo4j) ├── Cache Invalidator → Redis └── Notification Worker → Email/Slack alerts
Message Queue Patterns

Kafka vs RabbitMQ vs SQS

Comparison
FeatureKafkaRabbitMQAmazon SQS
TypeEvent streaming logTraditional message brokerManaged queue (AWS)
Message RetentionDays/weeks (replay)Until consumedUp to 14 days
ThroughputMillions/secThousands/secManaged scaling
OrderingPer-partitionPer-queue (FIFO)SQS FIFO queues
RoutingTopic → partitionsExchanges (direct, fanout, topic)Simple queues
Consumer GroupsYes (competing consumers)Competing consumersMultiple consumers
ReplayYes (by offset)NoNo
Best ForEvent streaming, analytics, audit logTask queues, RPC, complex routingSimple decoupling, AWS native
Interview Answer
Use Kafka when you need: high throughput, replay, multiple consumers reading same data, event sourcing. Use RabbitMQ for: complex routing, request-reply, smaller scale. Use SQS for: AWS-native, serverless, simple queuing.

Dead Letter Queue (DLQ)

Reliability

What is a DLQ?

A special queue that receives messages that cannot be processed after N retries. Prevents poison messages from blocking the queue.

Normal Flow: Producer → Queue → Consumer → Success → Ack ↓ Processing Fails (retry 1, 2, 3) ↓ (max retries exceeded) Dead Letter Queue → Alert/Manual Review

DLQ Best Practices

  • Alert on messages entering DLQ (metrics, PagerDuty)
  • Include original message + metadata (retry count, error reason)
  • Tools to replay DLQ messages after fixing root cause
  • Kafka: create separate topic "events-dlq", consumer publishes failed messages there

Transactional Outbox Pattern

ReliabilityExactly-Once
Problem
How do you atomically update the DB AND publish a Kafka message? If DB write succeeds but Kafka publish fails → inconsistency!
Solution: Transactional Outbox 1. BEGIN TRANSACTION INSERT INTO orders (id, status) VALUES (123, 'placed') INSERT INTO outbox (event_type, payload) VALUES ('ORDER_PLACED', '...') COMMIT 2. Outbox Relay Process (polls outbox table): SELECT * FROM outbox WHERE sent = false → Publish to Kafka → UPDATE outbox SET sent = true OR: Use Debezium (CDC) to read DB transaction log → publish to Kafka (no polling needed, near real-time)

Message Ordering Guarantees

Design
ScopeKafka Guarantee
Within a partition Strictly ordered (messages appended sequentially)
Across partitions No global ordering
For a specific key All messages with same key go to same partition → ordered
Pattern: Ordering by Entity
Use entity ID (user_id, order_id) as partition key. All events for a given entity are ordered. This is the Kafka best practice for event-driven systems.

Schema Registry & Avro

Production

Why Schema Registry?

  • Producer and consumer must agree on message format
  • Schema Registry stores Avro/Protobuf/JSON schemas
  • Producer registers schema, consumer fetches it by ID
  • Schema evolution: backward compatible (add optional fields), forward compatible (remove fields)
// Avro Schema for Metadata Event
{
  "type": "record",
  "name": "MetadataEvent",
  "namespace": "com.alation.events",
  "fields": [
    {"name": "eventId",    "type": "string"},
    {"name": "eventType",  "type": "string"},
    {"name": "tableId",    "type": "long"},
    {"name": "tableName",  "type": "string"},
    {"name": "timestamp",  "type": "long"},
    {"name": "userId",     "type": ["null", "string"], "default": null}
  ]
}

Interview Q: Design a Real-time Metadata Change System

Must Know
Scenario
"Design a system where when a table's schema changes in a data warehouse, all downstream consumers (search index, lineage graph, data owners) are notified in real-time."
CDC Layer: Debezium reads database transaction log → publishes to Kafka topic "schema-changes"
Kafka Topics: "schema-changes" (3 partitions, RF=3, key=table_id for ordering)
Consumer Group 1 - Search Indexer: Updates Elasticsearch with new schema → search reflects changes within seconds
Consumer Group 2 - Lineage Updater: Updates graph DB with new column relationships
Consumer Group 3 - Notifier: Sends email/Slack to table owners and stewards
DLQ: Failed events go to "schema-changes-dlq" → alert on-call team
Exactly-Once: Transactional outbox pattern ensures schema change written to DB and published atomically
Snowflake Schema Change ↓ Debezium (CDC) → Kafka: "schema-changes" (key=table_id) ↓ ┌────────────────────────────────────────┐ │ Consumer Group: search-indexer │ → Elasticsearch │ Consumer Group: lineage-updater │ → Neo4j │ Consumer Group: notification-service │ → Email/Slack └────────────────────────────────────────┘ ↓ (failures) Kafka: "schema-changes-dlq" → Alert + Manual Review