Apache Kafka & Message Queues
Distributed streaming, message queue architectures, and real-world patterns.
Apache Kafka — Core Architecture
What is Kafka?
Apache Kafka is a distributed event streaming platform. It acts as a highly scalable, fault-tolerant message bus. Think of it as a durable commit log where producers append events and consumers read them at their own pace.
Kafka Architecture Overview
Core
KAFKA CLUSTER
┌──────────────────────────────────────────────────────────┐
│ │
│ Topic: "user-events" (3 Partitions, RF=2) │
│ │
│ Partition 0: [msg0][msg1][msg4][msg7] → Broker 1 (Lead) │
│ Broker 2 (Replica)│
│ Partition 1: [msg2][msg5][msg8] → Broker 2 (Lead) │
│ Broker 3 (Replica)│
│ Partition 2: [msg3][msg6][msg9] → Broker 3 (Lead) │
│ Broker 1 (Replica)│
└──────────────────────────────────────────────────────────┘
Producers → write to partitions (key-based or round-robin)
Consumers (in Consumer Group) → each partition read by ONE consumer
ZooKeeper / KRaft → cluster metadata, leader election
Key Components
- Broker: Kafka server that stores and serves messages
- Topic: Named stream of records (like a DB table)
- Partition: Ordered, immutable log — unit of parallelism
- Offset: Position of a message in partition (0-indexed)
- Producer: Publishes records to topics
- Consumer: Reads records from topics
- Consumer Group: Set of consumers sharing load
- Replication Factor: Number of partition copies
Key Properties
- Durability: Messages persisted to disk (configurable retention)
- High Throughput: Millions of msgs/sec via sequential I/O, batching
- Fault Tolerant: Replication across brokers
- Scalable: Add partitions/brokers independently
- Ordered: Per-partition ordering guaranteed
- Replay: Consumers can re-read from any offset
- Decoupled: Producer/consumer don't know each other
Topics, Partitions & Offsets
Core
| Concept | Detail |
|---|---|
| Topic | Logical category of messages. e.g., "order-events", "user-activity" |
| Partition | Physical split of topic. Enables parallel consumption. Messages ordered within a partition. |
| Partition Key | hash(key) % numPartitions. Same key → same partition → ordering guarantee per key |
| Offset | Sequential ID of message in partition. Consumer tracks its offset. Can replay from any offset. |
| Log Retention | Configurable by time (7 days) or size (1GB). Can be unlimited for event sourcing. |
| Replication Factor | RF=3 means 3 copies of each partition. Leader handles reads/writes, replicas are standby. |
| ISR | In-Sync Replicas — replicas that are caught up with leader. Message acked when ISR all confirm. |
How Many Partitions?
Start with: max(target throughput / producer throughput, target throughput / consumer throughput). Can increase partitions but cannot decrease. Common: 3, 6, 12, 24 per topic.
Consumer Groups & Rebalancing
Core
Topic "orders" — 4 Partitions
Consumer Group A (Order Processor):
├── Consumer 1 → Partition 0, 1
└── Consumer 2 → Partition 2, 3
Consumer Group B (Analytics):
├── Consumer 1 → Partition 0, 1
├── Consumer 2 → Partition 2
└── Consumer 3 → Partition 3
Adding Consumer to Group A:
├── Consumer 1 → Partition 0
├── Consumer 2 → Partition 1
├── Consumer 3 → Partition 2 ← new
└── [Partition 3 unassigned until Consumer 4 joins]
If consumers > partitions → some consumers are idle!
Offset Management
auto.commit.enable=true— commits offsets automatically (at-least-once risk)- Manual commit: commitSync() after processing → exactly-once semantics
- Offsets stored in
__consumer_offsetsinternal topic - earliest: start from beginning of topic
- latest: start from latest messages only
Delivery Semantics
Critical
| Semantic | Behavior | Config | Use Case |
|---|---|---|---|
| At Most Once | Messages may be lost, never duplicated | acks=0, auto-commit before processing | Metrics, logs where loss OK |
| At Least Once | No loss, but may be duplicated | acks=all, manual commit after processing | Most use cases with idempotent consumers |
| Exactly Once | No loss, no duplication | enable.idempotence=true + transactions | Financial, order processing |
Producer acks Setting
acks=0: Fire and forget (fastest, no guarantee) | acks=1: Leader ack (fast, leader failure = loss) | acks=all: All ISR ack (slowest, no loss)
Kafka — Code Examples (Java)
Code
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MetadataEventProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // strongest durability
props.put("retries", 3);
props.put("enable.idempotence", "true"); // exactly-once
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// tableId as key → same table's events go to same partition → ordered
String key = "table:users";
String value = "{\"event\":\"schema_change\",\"table\":\"users\",\"ts\":1234567890}";
ProducerRecord<String, String> record =
new ProducerRecord<>("metadata-events", key, value);
// Async with callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Send failed: " + exception.getMessage());
} else {
System.out.printf("Sent to partition=%d offset=%d%n",
metadata.partition(), metadata.offset());
}
});
producer.flush();
producer.close();
}
}
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class MetadataEventConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "search-indexer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // start from beginning if no offset
props.put("enable.auto.commit", "false"); // manual commit for at-least-once
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("metadata-events"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s partition=%d offset=%d key=%s value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
// Process: update search index
processMetadataEvent(record.value());
}
// Commit after processing all records in batch
consumer.commitSync();
}
} finally {
consumer.close();
}
}
static void processMetadataEvent(String event) {
// Update Elasticsearch index, notify watchers, etc.
System.out.println("Indexing: " + event);
}
}
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
// Kafka Streams: process and transform streams within Kafka
public class EventAggregator {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "event-aggregator");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, String> events =
builder.stream("metadata-events");
// Filter only schema change events
KStream<String, String> schemaChanges =
events.filter((key, value) -> value.contains("schema_change"));
// Count events per table (tumbling window: 1 hour)
KTable<Windowed<String>, Long> eventCounts = schemaChanges
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count();
// Write results to output topic
eventCounts.toStream()
.map((key, count) -> KeyValue.pair(key.key(), count.toString()))
.to("schema-change-stats");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Kafka Use Cases at Alation-like Systems
Important
| Use Case | Kafka Role | Topics |
|---|---|---|
| Metadata Change Propagation | When a table schema changes in source DB, publish event → all consumers (search index, lineage graph, notifications) update | metadata-changes |
| Audit Log | Every user action (view, edit, comment on data) streamed to Kafka → audit trail, analytics | user-activity |
| Search Index Updates | Incremental search index updates via change events (CDC from DB → Kafka → Elasticsearch) | cdc-events |
| Data Quality Alerts | Data quality checks run, publish violations → notification workers send alerts to data owners | quality-alerts |
| Connector Status | Metadata crawlers publish status → monitoring dashboards | connector-status |
Alation Metadata Pipeline:
Source DB (Snowflake/Redshift)
↓ CDC (Debezium)
Kafka Topic: "source-metadata"
↓
Kafka Consumers:
├── Search Indexer → Elasticsearch
├── Lineage Updater → Graph DB (Neo4j)
├── Cache Invalidator → Redis
└── Notification Worker → Email/Slack alerts
Message Queue Patterns
Kafka vs RabbitMQ vs SQS
Comparison
| Feature | Kafka | RabbitMQ | Amazon SQS |
|---|---|---|---|
| Type | Event streaming log | Traditional message broker | Managed queue (AWS) |
| Message Retention | Days/weeks (replay) | Until consumed | Up to 14 days |
| Throughput | Millions/sec | Thousands/sec | Managed scaling |
| Ordering | Per-partition | Per-queue (FIFO) | SQS FIFO queues |
| Routing | Topic → partitions | Exchanges (direct, fanout, topic) | Simple queues |
| Consumer Groups | Yes (competing consumers) | Competing consumers | Multiple consumers |
| Replay | Yes (by offset) | No | No |
| Best For | Event streaming, analytics, audit log | Task queues, RPC, complex routing | Simple decoupling, AWS native |
Interview Answer
Use Kafka when you need: high throughput, replay, multiple consumers reading same data, event sourcing. Use RabbitMQ for: complex routing, request-reply, smaller scale. Use SQS for: AWS-native, serverless, simple queuing.Dead Letter Queue (DLQ)
Reliability
What is a DLQ?
A special queue that receives messages that cannot be processed after N retries. Prevents poison messages from blocking the queue.
Normal Flow:
Producer → Queue → Consumer → Success → Ack
↓
Processing Fails (retry 1, 2, 3)
↓ (max retries exceeded)
Dead Letter Queue → Alert/Manual Review
DLQ Best Practices
- Alert on messages entering DLQ (metrics, PagerDuty)
- Include original message + metadata (retry count, error reason)
- Tools to replay DLQ messages after fixing root cause
- Kafka: create separate topic "events-dlq", consumer publishes failed messages there
Transactional Outbox Pattern
ReliabilityExactly-Once
Problem
How do you atomically update the DB AND publish a Kafka message? If DB write succeeds but Kafka publish fails → inconsistency!Solution: Transactional Outbox
1. BEGIN TRANSACTION
INSERT INTO orders (id, status) VALUES (123, 'placed')
INSERT INTO outbox (event_type, payload) VALUES ('ORDER_PLACED', '...')
COMMIT
2. Outbox Relay Process (polls outbox table):
SELECT * FROM outbox WHERE sent = false
→ Publish to Kafka
→ UPDATE outbox SET sent = true
OR: Use Debezium (CDC) to read DB transaction log → publish to Kafka
(no polling needed, near real-time)
Message Ordering Guarantees
Design
| Scope | Kafka Guarantee |
|---|---|
| Within a partition | Strictly ordered (messages appended sequentially) |
| Across partitions | No global ordering |
| For a specific key | All messages with same key go to same partition → ordered |
Pattern: Ordering by Entity
Use entity ID (user_id, order_id) as partition key. All events for a given entity are ordered. This is the Kafka best practice for event-driven systems.Schema Registry & Avro
Production
Why Schema Registry?
- Producer and consumer must agree on message format
- Schema Registry stores Avro/Protobuf/JSON schemas
- Producer registers schema, consumer fetches it by ID
- Schema evolution: backward compatible (add optional fields), forward compatible (remove fields)
// Avro Schema for Metadata Event
{
"type": "record",
"name": "MetadataEvent",
"namespace": "com.alation.events",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "eventType", "type": "string"},
{"name": "tableId", "type": "long"},
{"name": "tableName", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "userId", "type": ["null", "string"], "default": null}
]
}
Interview Q: Design a Real-time Metadata Change System
Must Know
Scenario
"Design a system where when a table's schema changes in a data warehouse, all downstream consumers (search index, lineage graph, data owners) are notified in real-time."CDC Layer: Debezium reads database transaction log → publishes to Kafka topic "schema-changes"
Kafka Topics: "schema-changes" (3 partitions, RF=3, key=table_id for ordering)
Consumer Group 1 - Search Indexer: Updates Elasticsearch with new schema → search reflects changes within seconds
Consumer Group 2 - Lineage Updater: Updates graph DB with new column relationships
Consumer Group 3 - Notifier: Sends email/Slack to table owners and stewards
DLQ: Failed events go to "schema-changes-dlq" → alert on-call team
Exactly-Once: Transactional outbox pattern ensures schema change written to DB and published atomically
Snowflake Schema Change
↓
Debezium (CDC) → Kafka: "schema-changes" (key=table_id)
↓
┌────────────────────────────────────────┐
│ Consumer Group: search-indexer │ → Elasticsearch
│ Consumer Group: lineage-updater │ → Neo4j
│ Consumer Group: notification-service │ → Email/Slack
└────────────────────────────────────────┘
↓ (failures)
Kafka: "schema-changes-dlq" → Alert + Manual Review