Infra IT Consulting logo Infra ITC
Tech Tutorials & How-Tos kafkakinesisstreaming

Kafka vs. Kinesis: A Hands-On Comparison for Data Engineers

By Infra IT Consulting Β· Β· 11 min read

Real-time data engineering on AWS presents an early fork in the road: Apache Kafka or Amazon Kinesis? Both are durable, ordered, partitioned log systems. Both handle high-throughput streaming workloads. But their operational models, pricing structures, and integration stories are genuinely different. This tutorial goes beyond theory β€” it shows producer and consumer code for both, compares their architectures concretely, and provides a decision matrix to help you choose.

Architecture Primer

Amazon Kinesis Data Streams is a fully managed AWS service. You provision shards β€” each shard handles 1 MB/s write throughput and 2 MB/s read throughput. Scaling means adding or removing shards via the API. Retention is configurable from 24 hours to 365 days. AWS handles replication, durability, and availability.

Apache Kafka is an open-source distributed log. On AWS you run it on Amazon MSK (Managed Streaming for Apache Kafka), which handles broker provisioning and ZooKeeper/KRaft coordination β€” but you still manage broker sizing, storage scaling, and consumer group lag monitoring. Kafka uses partitions (analogous to shards) and supports longer retention and more flexible consumer semantics than Kinesis.

Kinesis: Producer and Consumer Code

Install the AWS SDK:

pip install boto3

Kinesis Producer:

# kinesis_producer.py
import boto3
import json
import time
import uuid
import logging
from datetime import datetime, timezone

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

kinesis = boto3.client("kinesis", region_name="ca-central-1")
STREAM_NAME = "clickstream-events"


def put_record(event: dict) -> dict:
    """Put a single record to Kinesis. Partition key determines shard assignment."""
    response = kinesis.put_record(
        StreamName=STREAM_NAME,
        Data=json.dumps(event).encode("utf-8"),
        PartitionKey=event.get("user_id", str(uuid.uuid4())),  # route by user_id
    )
    return response


def put_records_batch(events: list[dict]) -> dict:
    """
    Put up to 500 records in a single API call (max 5 MB total).
    More efficient than individual put_record calls.
    """
    records = [
        {
            "Data": json.dumps(event).encode("utf-8"),
            "PartitionKey": event.get("user_id", str(uuid.uuid4())),
        }
        for event in events
    ]

    response = kinesis.put_records(StreamName=STREAM_NAME, Records=records)

    failed = response.get("FailedRecordCount", 0)
    if failed > 0:
        logger.warning(f"{failed} records failed to write β€” consider retry logic")

    return response


def simulate_clickstream():
    """Simulate a stream of e-commerce clickstream events."""
    pages = ["/home", "/products", "/cart", "/checkout", "/confirmation"]
    user_ids = [f"user_{i:04d}" for i in range(1, 201)]

    batch = []
    for i in range(10_000):
        event = {
            "event_id": str(uuid.uuid4()),
            "user_id": user_ids[i % len(user_ids)],
            "session_id": f"sess_{i // 5:04d}",
            "page": pages[i % len(pages)],
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "event_type": "page_view",
        }
        batch.append(event)

        if len(batch) == 500:
            result = put_records_batch(batch)
            logger.info(f"Put batch of 500, failed: {result['FailedRecordCount']}")
            batch = []
            time.sleep(0.1)  # brief pause to stay within shard limits

    if batch:
        put_records_batch(batch)


if __name__ == "__main__":
    simulate_clickstream()

Kinesis Consumer (Enhanced Fan-Out):

# kinesis_consumer.py
import boto3
import json
import base64
import logging
from datetime import datetime, timezone

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

kinesis = boto3.client("kinesis", region_name="ca-central-1")
STREAM_NAME = "clickstream-events"


def get_shard_ids(stream_name: str) -> list[str]:
    response = kinesis.describe_stream_summary(StreamName=stream_name)
    shard_count = response["StreamDescriptionSummary"]["OpenShardCount"]
    
    paginator = kinesis.get_paginator("list_shards")
    shard_ids = []
    for page in paginator.paginate(StreamName=stream_name):
        shard_ids.extend(s["ShardId"] for s in page["Shards"])
    return shard_ids


def consume_shard(shard_id: str, iterator_type: str = "TRIM_HORIZON"):
    """
    Consume records from a single shard.
    iterator_type: TRIM_HORIZON (from beginning), LATEST (new records only),
                   AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER, AT_TIMESTAMP
    """
    # Get initial shard iterator
    iterator_response = kinesis.get_shard_iterator(
        StreamName=STREAM_NAME,
        ShardId=shard_id,
        ShardIteratorType=iterator_type,
    )
    shard_iterator = iterator_response["ShardIterator"]

    records_processed = 0
    while shard_iterator:
        response = kinesis.get_records(
            ShardIterator=shard_iterator,
            Limit=100,  # max 100 records per call
        )

        records = response.get("Records", [])
        for record in records:
            data = json.loads(record["Data"].decode("utf-8"))
            sequence_number = record["SequenceNumber"]
            arrival_time = record["ApproximateArrivalTimestamp"]

            # Calculate processing latency
            event_time = datetime.fromisoformat(data["timestamp"].replace("Z", "+00:00"))
            latency_ms = (arrival_time.replace(tzinfo=timezone.utc) - event_time).total_seconds() * 1000

            logger.info(
                f"Shard {shard_id} | seq={sequence_number[:20]}... | "
                f"user={data['user_id']} | latency={latency_ms:.0f}ms"
            )
            records_processed += 1

        shard_iterator = response.get("NextShardIterator")

        if not records:
            # No new records β€” Kinesis returns empty batches when caught up
            import time
            time.sleep(1)  # poll every second when idle

    logger.info(f"Shard {shard_id} closed. Processed {records_processed} records.")


if __name__ == "__main__":
    import threading
    shard_ids = get_shard_ids(STREAM_NAME)
    logger.info(f"Consuming from {len(shard_ids)} shards")

    # Consume each shard in a separate thread
    threads = [
        threading.Thread(target=consume_shard, args=(shard_id,))
        for shard_id in shard_ids
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

Kafka on MSK: Producer and Consumer Code

Install the Confluent Kafka Python client:

pip install confluent-kafka

Kafka Producer:

# kafka_producer.py
import json
import uuid
import time
import logging
from datetime import datetime, timezone
from confluent_kafka import Producer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# MSK broker endpoint (from the MSK console β€” use TLS endpoint for production)
BOOTSTRAP_SERVERS = "b-1.mycluster.abc123.c1.kafka.ca-central-1.amazonaws.com:9092"
TOPIC_NAME = "clickstream-events"


def create_producer() -> Producer:
    config = {
        "bootstrap.servers": BOOTSTRAP_SERVERS,
        "client.id": "clickstream-producer",
        "acks": "all",                      # wait for all in-sync replicas
        "retries": 5,
        "retry.backoff.ms": 300,
        "linger.ms": 10,                    # batch records for up to 10ms
        "batch.size": 65536,                # 64 KB batch size
        "compression.type": "snappy",
        "enable.idempotence": True,         # exactly-once producer semantics
    }
    return Producer(config)


def delivery_callback(err, msg):
    """Called by Kafka for each produced message (async confirmation)."""
    if err:
        logger.error(f"Delivery failed: {err}")
    else:
        logger.debug(
            f"Delivered to {msg.topic()} [{msg.partition()}] "
            f"offset {msg.offset()}"
        )


def produce_events(producer: Producer, num_events: int = 10_000):
    pages = ["/home", "/products", "/cart", "/checkout"]
    user_ids = [f"user_{i:04d}" for i in range(1, 201)]

    for i in range(num_events):
        event = {
            "event_id": str(uuid.uuid4()),
            "user_id": user_ids[i % len(user_ids)],
            "page": pages[i % len(pages)],
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "event_type": "page_view",
        }

        producer.produce(
            topic=TOPIC_NAME,
            key=event["user_id"].encode("utf-8"),   # partition key
            value=json.dumps(event).encode("utf-8"),
            callback=delivery_callback,
        )

        # poll() triggers delivery callbacks β€” call periodically
        producer.poll(0)

        if i % 1000 == 0:
            producer.flush()  # wait for all queued messages to be delivered
            logger.info(f"Produced {i} events")

    producer.flush()
    logger.info(f"Done. Produced {num_events} events total.")


if __name__ == "__main__":
    p = create_producer()
    produce_events(p)

Kafka Consumer:

# kafka_consumer.py
import json
import logging
import signal
import sys
from datetime import datetime, timezone
from confluent_kafka import Consumer, KafkaError, KafkaException

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

BOOTSTRAP_SERVERS = "b-1.mycluster.abc123.c1.kafka.ca-central-1.amazonaws.com:9092"
TOPIC_NAME = "clickstream-events"
CONSUMER_GROUP = "clickstream-processor-v1"


def create_consumer() -> Consumer:
    config = {
        "bootstrap.servers": BOOTSTRAP_SERVERS,
        "group.id": CONSUMER_GROUP,
        "auto.offset.reset": "earliest",    # start from beginning if no committed offset
        "enable.auto.commit": False,        # manual commit for at-least-once semantics
        "max.poll.interval.ms": 300_000,
        "session.timeout.ms": 30_000,
        "fetch.min.bytes": 1,
        "fetch.max.wait.ms": 500,
    }
    return Consumer(config)


def process_record(record: dict) -> None:
    """Business logic β€” extend this with your actual processing."""
    logger.info(
        f"Processing: user={record['user_id']} "
        f"page={record['page']} ts={record['timestamp']}"
    )


def consume_loop(consumer: Consumer, running: list):
    consumer.subscribe([TOPIC_NAME])
    logger.info(f"Subscribed to topic '{TOPIC_NAME}' as group '{CONSUMER_GROUP}'")

    try:
        while running[0]:
            msg = consumer.poll(timeout=1.0)

            if msg is None:
                continue  # no new messages

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    logger.debug(f"Reached end of partition {msg.partition()}")
                else:
                    raise KafkaException(msg.error())
            else:
                record = json.loads(msg.value().decode("utf-8"))
                process_record(record)

                # Commit offset after successful processing
                consumer.commit(message=msg, asynchronous=False)

    finally:
        consumer.close()
        logger.info("Consumer closed cleanly.")


if __name__ == "__main__":
    running = [True]

    def shutdown(signum, frame):
        logger.info("Shutting down...")
        running[0] = False

    signal.signal(signal.SIGINT, shutdown)
    signal.signal(signal.SIGTERM, shutdown)

    c = create_consumer()
    consume_loop(c, running)

Architecture Comparison

DimensionAmazon Kinesis Data StreamsApache Kafka on MSK
Unit of parallelismShard (1 MB/s write, 2 MB/s read)Partition (no fixed throughput limit)
ScalingAdd/remove shards via APIAdd brokers or partitions; partitions cannot be reduced
Default retention24 hours (up to 365 days with extra cost)7 days typical; unlimited with log compaction
Consumer modelPull via GetRecords; or KCL; or Lambda triggerPull via consumer group; flexible offset management
Replication3x across AZs, managed by AWSConfigurable replication factor; managed by MSK
Ordering guaranteeWithin a shard (by partition key)Within a partition (by key)
Exactly-onceWith KCL + DynamoDB checkpointingWith idempotent producer + transactional API
AWS integrationsNative: Lambda, Kinesis Firehose, KDA, GlueVia MSK Connect or custom consumers
Operational overheadVery low β€” fully serverlessLow on MSK, but schema registry and monitoring add work
Pricing modelPer shard-hour + PUT payload unitsPer broker-hour + storage
Typical cost (moderate load)~$150-400/month for 10 shards~$500-1,500/month for a 3-broker MSK cluster
Message size limit1 MB per record1 MB default; configurable up to ~10 MB

Decision Matrix: When to Choose Each

Choose Kinesis when:

  • Your team is AWS-native and wants minimal operational overhead
  • You need native, low-configuration integration with Lambda, Kinesis Firehose, or Glue Streaming
  • Throughput is predictable and can be right-sized in shards
  • You want pay-per-use economics without managing brokers
  • The project is new and you want to avoid Kafka expertise requirements

Choose Kafka (MSK) when:

  • You have existing Kafka producers/consumers (on-premises or in other clouds) and need compatibility
  • You need Kafka’s rich ecosystem: Schema Registry, Kafka Streams, ksqlDB, Kafka Connect
  • You need longer retention periods (weeks or months) without significant extra cost
  • Your consumers require complex consumer group semantics or transactions
  • You need to replay from arbitrary offsets across multiple independent consumer groups
  • Throughput patterns are bursty and partition-level flexibility is important

For most AWS-native greenfield data engineering projects at small-to-medium scale, Kinesis is the pragmatic choice. It requires less expertise to operate, integrates directly with Kinesis Firehose for S3 delivery, and connects to Lambda for serverless processing. For more on this, see our tutorial on Real-Time Streaming with Kinesis.

For organizations with existing Kafka infrastructure, multi-cloud requirements, or teams that already have Kafka expertise, MSK is the better long-term foundation β€” particularly when you need Kafka Connect for pulling data from operational databases or external SaaS systems.

Whatever streaming platform you choose, pairing it with a proper monitoring strategy is essential. See Monitoring Data Pipelines with CloudWatch for setting up lag alerts and throughput dashboards for both Kinesis and MSK.

Conclusion

Kafka and Kinesis solve the same core problem β€” durable, ordered, distributed event streaming β€” with different trade-offs. Kinesis is the simpler operational choice for AWS-native teams; Kafka offers more flexibility, a richer ecosystem, and better economics at very high throughput. The right choice depends on your team’s existing expertise, your integration requirements, and your throughput patterns.

If you are designing a real-time data architecture and want an expert assessment of which streaming platform fits your use case, contact Infra IT Consulting. We help Canadian organisations build streaming data infrastructure that is reliable, cost-efficient, and ready to scale.

Related posts