Kafka vs. Kinesis: A Hands-On Comparison for Data Engineers
Real-time data engineering on AWS presents an early fork in the road: Apache Kafka or Amazon Kinesis? Both are durable, ordered, partitioned log systems. Both handle high-throughput streaming workloads. But their operational models, pricing structures, and integration stories are genuinely different. This tutorial goes beyond theory β it shows producer and consumer code for both, compares their architectures concretely, and provides a decision matrix to help you choose.
Architecture Primer
Amazon Kinesis Data Streams is a fully managed AWS service. You provision shards β each shard handles 1 MB/s write throughput and 2 MB/s read throughput. Scaling means adding or removing shards via the API. Retention is configurable from 24 hours to 365 days. AWS handles replication, durability, and availability.
Apache Kafka is an open-source distributed log. On AWS you run it on Amazon MSK (Managed Streaming for Apache Kafka), which handles broker provisioning and ZooKeeper/KRaft coordination β but you still manage broker sizing, storage scaling, and consumer group lag monitoring. Kafka uses partitions (analogous to shards) and supports longer retention and more flexible consumer semantics than Kinesis.
Kinesis: Producer and Consumer Code
Install the AWS SDK:
pip install boto3
Kinesis Producer:
# kinesis_producer.py
import boto3
import json
import time
import uuid
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
kinesis = boto3.client("kinesis", region_name="ca-central-1")
STREAM_NAME = "clickstream-events"
def put_record(event: dict) -> dict:
"""Put a single record to Kinesis. Partition key determines shard assignment."""
response = kinesis.put_record(
StreamName=STREAM_NAME,
Data=json.dumps(event).encode("utf-8"),
PartitionKey=event.get("user_id", str(uuid.uuid4())), # route by user_id
)
return response
def put_records_batch(events: list[dict]) -> dict:
"""
Put up to 500 records in a single API call (max 5 MB total).
More efficient than individual put_record calls.
"""
records = [
{
"Data": json.dumps(event).encode("utf-8"),
"PartitionKey": event.get("user_id", str(uuid.uuid4())),
}
for event in events
]
response = kinesis.put_records(StreamName=STREAM_NAME, Records=records)
failed = response.get("FailedRecordCount", 0)
if failed > 0:
logger.warning(f"{failed} records failed to write β consider retry logic")
return response
def simulate_clickstream():
"""Simulate a stream of e-commerce clickstream events."""
pages = ["/home", "/products", "/cart", "/checkout", "/confirmation"]
user_ids = [f"user_{i:04d}" for i in range(1, 201)]
batch = []
for i in range(10_000):
event = {
"event_id": str(uuid.uuid4()),
"user_id": user_ids[i % len(user_ids)],
"session_id": f"sess_{i // 5:04d}",
"page": pages[i % len(pages)],
"timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "page_view",
}
batch.append(event)
if len(batch) == 500:
result = put_records_batch(batch)
logger.info(f"Put batch of 500, failed: {result['FailedRecordCount']}")
batch = []
time.sleep(0.1) # brief pause to stay within shard limits
if batch:
put_records_batch(batch)
if __name__ == "__main__":
simulate_clickstream()
Kinesis Consumer (Enhanced Fan-Out):
# kinesis_consumer.py
import boto3
import json
import base64
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
kinesis = boto3.client("kinesis", region_name="ca-central-1")
STREAM_NAME = "clickstream-events"
def get_shard_ids(stream_name: str) -> list[str]:
response = kinesis.describe_stream_summary(StreamName=stream_name)
shard_count = response["StreamDescriptionSummary"]["OpenShardCount"]
paginator = kinesis.get_paginator("list_shards")
shard_ids = []
for page in paginator.paginate(StreamName=stream_name):
shard_ids.extend(s["ShardId"] for s in page["Shards"])
return shard_ids
def consume_shard(shard_id: str, iterator_type: str = "TRIM_HORIZON"):
"""
Consume records from a single shard.
iterator_type: TRIM_HORIZON (from beginning), LATEST (new records only),
AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER, AT_TIMESTAMP
"""
# Get initial shard iterator
iterator_response = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType=iterator_type,
)
shard_iterator = iterator_response["ShardIterator"]
records_processed = 0
while shard_iterator:
response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100, # max 100 records per call
)
records = response.get("Records", [])
for record in records:
data = json.loads(record["Data"].decode("utf-8"))
sequence_number = record["SequenceNumber"]
arrival_time = record["ApproximateArrivalTimestamp"]
# Calculate processing latency
event_time = datetime.fromisoformat(data["timestamp"].replace("Z", "+00:00"))
latency_ms = (arrival_time.replace(tzinfo=timezone.utc) - event_time).total_seconds() * 1000
logger.info(
f"Shard {shard_id} | seq={sequence_number[:20]}... | "
f"user={data['user_id']} | latency={latency_ms:.0f}ms"
)
records_processed += 1
shard_iterator = response.get("NextShardIterator")
if not records:
# No new records β Kinesis returns empty batches when caught up
import time
time.sleep(1) # poll every second when idle
logger.info(f"Shard {shard_id} closed. Processed {records_processed} records.")
if __name__ == "__main__":
import threading
shard_ids = get_shard_ids(STREAM_NAME)
logger.info(f"Consuming from {len(shard_ids)} shards")
# Consume each shard in a separate thread
threads = [
threading.Thread(target=consume_shard, args=(shard_id,))
for shard_id in shard_ids
]
for t in threads:
t.start()
for t in threads:
t.join()
Kafka on MSK: Producer and Consumer Code
Install the Confluent Kafka Python client:
pip install confluent-kafka
Kafka Producer:
# kafka_producer.py
import json
import uuid
import time
import logging
from datetime import datetime, timezone
from confluent_kafka import Producer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# MSK broker endpoint (from the MSK console β use TLS endpoint for production)
BOOTSTRAP_SERVERS = "b-1.mycluster.abc123.c1.kafka.ca-central-1.amazonaws.com:9092"
TOPIC_NAME = "clickstream-events"
def create_producer() -> Producer:
config = {
"bootstrap.servers": BOOTSTRAP_SERVERS,
"client.id": "clickstream-producer",
"acks": "all", # wait for all in-sync replicas
"retries": 5,
"retry.backoff.ms": 300,
"linger.ms": 10, # batch records for up to 10ms
"batch.size": 65536, # 64 KB batch size
"compression.type": "snappy",
"enable.idempotence": True, # exactly-once producer semantics
}
return Producer(config)
def delivery_callback(err, msg):
"""Called by Kafka for each produced message (async confirmation)."""
if err:
logger.error(f"Delivery failed: {err}")
else:
logger.debug(
f"Delivered to {msg.topic()} [{msg.partition()}] "
f"offset {msg.offset()}"
)
def produce_events(producer: Producer, num_events: int = 10_000):
pages = ["/home", "/products", "/cart", "/checkout"]
user_ids = [f"user_{i:04d}" for i in range(1, 201)]
for i in range(num_events):
event = {
"event_id": str(uuid.uuid4()),
"user_id": user_ids[i % len(user_ids)],
"page": pages[i % len(pages)],
"timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "page_view",
}
producer.produce(
topic=TOPIC_NAME,
key=event["user_id"].encode("utf-8"), # partition key
value=json.dumps(event).encode("utf-8"),
callback=delivery_callback,
)
# poll() triggers delivery callbacks β call periodically
producer.poll(0)
if i % 1000 == 0:
producer.flush() # wait for all queued messages to be delivered
logger.info(f"Produced {i} events")
producer.flush()
logger.info(f"Done. Produced {num_events} events total.")
if __name__ == "__main__":
p = create_producer()
produce_events(p)
Kafka Consumer:
# kafka_consumer.py
import json
import logging
import signal
import sys
from datetime import datetime, timezone
from confluent_kafka import Consumer, KafkaError, KafkaException
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
BOOTSTRAP_SERVERS = "b-1.mycluster.abc123.c1.kafka.ca-central-1.amazonaws.com:9092"
TOPIC_NAME = "clickstream-events"
CONSUMER_GROUP = "clickstream-processor-v1"
def create_consumer() -> Consumer:
config = {
"bootstrap.servers": BOOTSTRAP_SERVERS,
"group.id": CONSUMER_GROUP,
"auto.offset.reset": "earliest", # start from beginning if no committed offset
"enable.auto.commit": False, # manual commit for at-least-once semantics
"max.poll.interval.ms": 300_000,
"session.timeout.ms": 30_000,
"fetch.min.bytes": 1,
"fetch.max.wait.ms": 500,
}
return Consumer(config)
def process_record(record: dict) -> None:
"""Business logic β extend this with your actual processing."""
logger.info(
f"Processing: user={record['user_id']} "
f"page={record['page']} ts={record['timestamp']}"
)
def consume_loop(consumer: Consumer, running: list):
consumer.subscribe([TOPIC_NAME])
logger.info(f"Subscribed to topic '{TOPIC_NAME}' as group '{CONSUMER_GROUP}'")
try:
while running[0]:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue # no new messages
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logger.debug(f"Reached end of partition {msg.partition()}")
else:
raise KafkaException(msg.error())
else:
record = json.loads(msg.value().decode("utf-8"))
process_record(record)
# Commit offset after successful processing
consumer.commit(message=msg, asynchronous=False)
finally:
consumer.close()
logger.info("Consumer closed cleanly.")
if __name__ == "__main__":
running = [True]
def shutdown(signum, frame):
logger.info("Shutting down...")
running[0] = False
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
c = create_consumer()
consume_loop(c, running)
Architecture Comparison
| Dimension | Amazon Kinesis Data Streams | Apache Kafka on MSK |
|---|---|---|
| Unit of parallelism | Shard (1 MB/s write, 2 MB/s read) | Partition (no fixed throughput limit) |
| Scaling | Add/remove shards via API | Add brokers or partitions; partitions cannot be reduced |
| Default retention | 24 hours (up to 365 days with extra cost) | 7 days typical; unlimited with log compaction |
| Consumer model | Pull via GetRecords; or KCL; or Lambda trigger | Pull via consumer group; flexible offset management |
| Replication | 3x across AZs, managed by AWS | Configurable replication factor; managed by MSK |
| Ordering guarantee | Within a shard (by partition key) | Within a partition (by key) |
| Exactly-once | With KCL + DynamoDB checkpointing | With idempotent producer + transactional API |
| AWS integrations | Native: Lambda, Kinesis Firehose, KDA, Glue | Via MSK Connect or custom consumers |
| Operational overhead | Very low β fully serverless | Low on MSK, but schema registry and monitoring add work |
| Pricing model | Per shard-hour + PUT payload units | Per broker-hour + storage |
| Typical cost (moderate load) | ~$150-400/month for 10 shards | ~$500-1,500/month for a 3-broker MSK cluster |
| Message size limit | 1 MB per record | 1 MB default; configurable up to ~10 MB |
Decision Matrix: When to Choose Each
Choose Kinesis when:
- Your team is AWS-native and wants minimal operational overhead
- You need native, low-configuration integration with Lambda, Kinesis Firehose, or Glue Streaming
- Throughput is predictable and can be right-sized in shards
- You want pay-per-use economics without managing brokers
- The project is new and you want to avoid Kafka expertise requirements
Choose Kafka (MSK) when:
- You have existing Kafka producers/consumers (on-premises or in other clouds) and need compatibility
- You need Kafkaβs rich ecosystem: Schema Registry, Kafka Streams, ksqlDB, Kafka Connect
- You need longer retention periods (weeks or months) without significant extra cost
- Your consumers require complex consumer group semantics or transactions
- You need to replay from arbitrary offsets across multiple independent consumer groups
- Throughput patterns are bursty and partition-level flexibility is important
For most AWS-native greenfield data engineering projects at small-to-medium scale, Kinesis is the pragmatic choice. It requires less expertise to operate, integrates directly with Kinesis Firehose for S3 delivery, and connects to Lambda for serverless processing. For more on this, see our tutorial on Real-Time Streaming with Kinesis.
For organizations with existing Kafka infrastructure, multi-cloud requirements, or teams that already have Kafka expertise, MSK is the better long-term foundation β particularly when you need Kafka Connect for pulling data from operational databases or external SaaS systems.
Whatever streaming platform you choose, pairing it with a proper monitoring strategy is essential. See Monitoring Data Pipelines with CloudWatch for setting up lag alerts and throughput dashboards for both Kinesis and MSK.
Conclusion
Kafka and Kinesis solve the same core problem β durable, ordered, distributed event streaming β with different trade-offs. Kinesis is the simpler operational choice for AWS-native teams; Kafka offers more flexibility, a richer ecosystem, and better economics at very high throughput. The right choice depends on your teamβs existing expertise, your integration requirements, and your throughput patterns.
If you are designing a real-time data architecture and want an expert assessment of which streaming platform fits your use case, contact Infra IT Consulting. We help Canadian organisations build streaming data infrastructure that is reliable, cost-efficient, and ready to scale.
Related posts
Book a free 30-minute consultation to discuss your data engineering and analytics needs.
Talk to our team β