0
0
KafkaHow-ToBeginner · 4 min read

How to Use Kafka Streams API: Simple Guide with Example

Use the KafkaStreams class with a StreamsBuilder to define your processing topology, then start the stream with start(). Configure properties like application.id and bootstrap.servers to connect to your Kafka cluster and process streams in real time.
📐

Syntax

The Kafka Streams API uses a StreamsBuilder to define the processing logic, which creates a Topology. Then, a KafkaStreams instance runs this topology with a configuration.

  • StreamsBuilder: Builds the stream processing topology.
  • Topology: The processing steps defined.
  • KafkaStreams: Runs the topology with given configs.
  • Properties: Configuration like application ID and Kafka servers.
java
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

StreamsBuilder builder = new StreamsBuilder();

// Define stream processing topology
KStream<String, String> stream = builder.stream("input-topic");

// Example processing
stream.mapValues(value -> value.toUpperCase())
      .to("output-topic");

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
💻

Example

This example reads messages from input-topic, converts the values to uppercase, and writes them to output-topic. It shows how to configure, build, and start a Kafka Streams application.

java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class SimpleKafkaStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> input = builder.stream("input-topic");

        KStream<String, String> uppercased = input.mapValues(value -> value.toUpperCase());
        uppercased.to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
Output
When running, messages from 'input-topic' are read, converted to uppercase, and sent to 'output-topic'. No console output unless errors occur.
⚠️

Common Pitfalls

  • Not setting application.id causes the app to fail starting.
  • Forgetting to call streams.start() means no processing happens.
  • Not closing streams on shutdown can cause resource leaks.
  • Using wrong SerDes leads to serialization errors.
  • Not handling exceptions inside processing logic can crash the app.
java
/* Wrong: Missing application.id */
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

/* Right: Set application.id */
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
📊

Quick Reference

Remember these key points when using Kafka Streams API:

  • StreamsBuilder: Define your processing steps.
  • KafkaStreams: Run your topology with configs.
  • Properties: Must include application.id and bootstrap.servers.
  • SerDes: Use correct serializers/deserializers for keys and values.
  • Shutdown: Always close streams gracefully.

Key Takeaways

Always set application.id and bootstrap.servers in your configuration.
Use StreamsBuilder to define your processing topology before starting KafkaStreams.
Call streams.start() to begin processing and close streams on shutdown.
Use correct SerDes to avoid serialization errors.
Handle exceptions inside your stream processing logic to keep the app stable.