Consider the following Java Kafka producer code using transactions. What will be printed to the console?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("enable.idempotence", "true"); props.put("transactional.id", "txn-1"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord<>("my-topic", "key1", "value1")); producer.commitTransaction(); System.out.println("Transaction committed"); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { producer.close(); System.out.println("Fatal error, producer closed"); } catch (KafkaException e) { producer.abortTransaction(); System.out.println("Transaction aborted"); }
Think about what happens when no exceptions are thrown during the transaction.
The code begins a transaction, sends a message, and commits the transaction without errors. So it prints "Transaction committed".
Examine this Kafka producer code snippet. What error will it raise when run?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Missing transactional.id property KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions();
Transactional producers require a specific property to be set before calling initTransactions().
Without the "transactional.id" property, calling initTransactions() throws IllegalStateException because the producer cannot initialize transactions.
Look at this code snippet. Why does it print "Transaction aborted"?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("enable.idempotence", "true"); props.put("transactional.id", "txn-2"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord<>("my-topic", "key2", "value2")); throw new KafkaException("Simulated failure"); // producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { producer.close(); System.out.println("Fatal error, producer closed"); } catch (KafkaException e) { producer.abortTransaction(); System.out.println("Transaction aborted"); }
Check what happens when an exception occurs after beginTransaction() but before commitTransaction().
The code throws a KafkaException before commitTransaction(), so the catch block aborts the transaction and prints "Transaction aborted".
Choose the code snippet that correctly begins and commits a Kafka transaction.
Remember Java method calls require parentheses and semicolons.
Option D correctly uses parentheses and semicolons for method calls. Others have syntax errors like missing parentheses or semicolons.
Given the following code, how many messages will be committed atomically to Kafka?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("enable.idempotence", "true"); props.put("transactional.id", "txn-3"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); producer.beginTransaction(); producer.send(new ProducerRecord<>("topic1", "key1", "value1")); producer.send(new ProducerRecord<>("topic2", "key2", "value2")); producer.send(new ProducerRecord<>("topic3", "key3", "value3")); producer.commitTransaction();
Think about what commitTransaction() does with all messages sent after beginTransaction().
All messages sent between beginTransaction() and commitTransaction() are committed atomically as a single transaction, so 3 messages are committed together.