Consider the following Kafka producer code snippet configured for exactly-once semantics (EOS). What will be the output printed after sending the message?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("enable.idempotence", "true"); props.put("acks", "all"); props.put("transactional.id", "txn-1"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord<>("my-topic", "key1", "value1")); producer.commitTransaction(); System.out.println("Message sent successfully"); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { producer.close(); System.out.println("Fatal error, producer closed"); } catch (KafkaException e) { producer.abortTransaction(); System.out.println("Transaction aborted"); }
Think about what happens when the transaction commits successfully without exceptions.
The code initializes a transactional producer and sends a message within a transaction. Since no exceptions occur, the transaction commits successfully and prints "Message sent successfully".
Given the following Kafka consumer code snippet that attempts to read committed messages with isolation level set incorrectly, what error will it raise?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group1"); props.put("enable.auto.commit", "false"); props.put("isolation.level", "read_uncommitted"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } }
Consider what happens when the isolation level is set to read_uncommitted.
Setting isolation.level to read_uncommitted allows the consumer to read all messages, including uncommitted ones. No error is raised, but exactly-once semantics are not guaranteed.
Which Kafka feature is primarily responsible for enabling exactly-once semantics (EOS) in Kafka Streams applications?
Think about how Kafka prevents duplicate writes and partial commits.
Exactly-once semantics in Kafka Streams rely on idempotent producers and transactional writes to atomically commit output and consumer offsets.
Given the following Kafka Streams code snippet with exactly-once processing enabled, what will be the output printed?
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> input = builder.stream("input-topic"); input.mapValues(value -> value.toUpperCase()) .to("output-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); System.out.println("Streams started with EOS");
Check the processing guarantee configuration and what the code prints.
The code sets processing guarantee to EXACTLY_ONCE_V2 and prints "Streams started with EOS" after starting the streams.
In Kafka exactly-once semantics, what is the consequence if a producer loses or changes its transactional.id between restarts?
Consider how Kafka prevents duplicate transactions from multiple producers with the same transactional.id.
If a producer changes its transactional.id, the broker fences the old producer instance, preventing it from committing transactions and thus avoiding duplicates.