1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Slf4j @Component public class CustomKafkaListener implements ConsumerSeekAware {
public Boolean seekFlag = false;
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@KafkaListener(topics = "topic", groupId = "group", concurrency = "${consumer-concurrency:2}") public void listenTestTopic(ConsumerRecord<String, String> record) { log.error(record.key() + " " + record.value() + " " + record.partition() + " " + record.offset()); }
@KafkaListener(topics = "topic", groupId = "group", concurrency = "${consumer-concurrency:2}") public void listenTestTopicAndCommit(ConsumerRecord<String, String> record, Acknowledgment ack) { log.error(record.key() + " " + record.value() + " " + record.partition() + " " + record.offset()); ack.acknowledge(); }
@KafkaListener(topics = "topic", groupId = "#{T(java.util.UUID).randomUUID().toString()}") public void listenBroadcast(ConsumerRecord<String, String> record, Acknowledgment ack) { log.error(record.key() + " " + record.value() + " " + record.partition() + " " + record.offset()); ack.acknowledge(); }
@KafkaListener(topics = "topic", groupId = "group") public void seekListener(ConsumerRecord<String, String> record) { if (seekFlag) { seekToOffset("topic", null, 0L); this.seekFlag = false; } }
@Override public void registerSeekCallback(ConsumerSeekCallback callback) { this.seekCallBack.set(callback); } public void seekToOffset(String topic, Integer partition, Long offset) { if (partition == null) { Map<String, TopicDescription> result = kafkaAdmin.describeTopics(topic); TopicDescription topicDescription = result.get(topic); List<TopicPartitionInfo> partitions = topicDescription.partitions(); for (TopicPartitionInfo topicPartitionInfo : partitions) { this.seekCallBack.get().seek(topic, topicPartitionInfo.partition(), offset); } } else { this.seekCallBack.get().seek(topic, partition, offset); } }
}
|