Kafka docker

For Kafka we are going to use Confluent's docker images

edit docker-compose.yml

---
version: '2'
services:
  zookeeper:
    image: "confluentinc/cp-zookeeper:4.0.0"
    hostname: zookeeper
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: "confluentinc/cp-kafka:4.0.0"
    hostname: kafka
    ports:
      - '9092:9092'
      - '29092:29092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    extra_hosts:
      - "moby:127.0.0.1"

Start the containers

docker-compose -p jv_ up

By default docker-compose prefixes the container name with folder name. I am using the prefix 'jv_' to define the desired prefix

Testing Kafka

kafkacat is an easy to use command line utility which will allows us to write, read and inspect Kafka topics.

To install

apt-get install kafkacat

To write to a topic

echo 'some data' | kafkacat -b localhost -t some-topic

To read from a topic

kafkacat -b localhost -t some-topic

% Auto-selecting Consumer mode (use -P or -C to override)
some data

To inspect a topic

kafkacat -L -b localhost -t some-topic

Metadata for some-topic (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 1 at localhost:9092
 1 topics:
  topic "some-topic" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

Spring Boot Kafka

Create a Spring Boot project with a Kafka dependency

spring init -d=kafka -artifactId=spring-kafka -name=sentenceStats spring-kafka

edit ./src/test/java/com/example/springkafka/SpringKafkaApplicationTests.java

package com.example.springkafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

    public static Logger logger = LoggerFactory.getLogger(SpringKafkaApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    private final CountDownLatch latch = new CountDownLatch(3);
    private final String topicName = "jvtest";


    @Override
    public void run(String... args) throws Exception {
        this.template.send(topicName, "foo1");
        this.template.send(topicName, "foo2");
        this.template.send(topicName, "foo3");
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All received");
    }

    @KafkaListener(topics = topicName)
    public void listen(ConsumerRecord<?, ?> cr) throws Exception {
        logger.info(cr.toString());
        latch.countDown();
    }
}

edit ./src/main/resources/application.properties

spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=earliest

When executed

mvn spring-boot:run

We should see the following log entries in the console log

Kafka version : 0.10.1.1
Kafka commitId : f10ef2720b03b247
Discovered coordinator localhost:9092 (id: 2147483646 rack: null) for group foo.
Revoking previously assigned partitions [] for group foo
partitions revoked:[]
(Re-)joining group foo
Successfully joined group foo with generation 1
Setting newly assigned partitions [jvtest-0] for group foo
partitions assigned:[jvtest-0]
ConsumerRecord(topic = jvtest, partition = 0, offset = 0, CreateTime = 1515831249483, checksum = 167230666, serialized key size = -1, serialized value size = 4, key = null, value = foo1)
ConsumerRecord(topic = jvtest, partition = 0, offset = 1, CreateTime = 1515831249491, checksum = 2130352327, serialized key size = -1, serialized value size = 4, key = null, value = foo2)
ConsumerRecord(topic = jvtest, partition = 0, offset = 2, CreateTime = 1515831249492, checksum = 1955507721, serialized key size = -1, serialized value size = 4, key = null, value = foo3)
All received

results matching ""

    No results matching ""