Spring for Apache Kafka

Spring for Apache Kafka

简介

Spring 官方封装了 Kafka 相关的 API,可以让 Spring 应用程序更方便的使用 Kafka。

使用

引入依赖

maven

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

gradle

1
2
3
dependencies {
compileOnly 'org.springframework.kafka:spring-kafka'
}

链接配置

1
2
3
4
5
6
7
spring:
kafka:
bootstrap-servers: <host_1>:<port_1>;<host_2>:<port_2>
listener:
ack-mode: manual
producer:
retries: 6

注:可以根据需求调整 ack-mode 如需手动则可进行上述配置,默认则无需配置此项。producer.retries 代表生产者遇到故障时的重试次数。

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaDemoController {

@Resource
KafkaTemplate<String, String> kafkaTemplate;

@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}

@GetMapping("/sync")
public HttpEntity<String> syncSend() {
try {
kafkaTemplate.send("wq", "asd", "123").get();
} catch (InterruptedException | ExecutionException e) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage());
}
return new HttpEntity<>("success");
}

@GetMapping("/async")
public HttpEntity<String> asyncSend() {
kafkaTemplate.send("wq", "asd", "123");
return new HttpEntity<>("success");
}

}

注:异步方式可能会导致发送失败,建议在配置当中声明重试次数或者配置回调。

回调配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomProducerListener implements ProducerListener<String, String> {

@Override
public void onSuccess(ProducerRecord<String, String> producerRecord,
RecordMetadata recordMetadata) {
log.error("success callback");
}

@Override
public void onError(ProducerRecord<String, String> producerRecord,
RecordMetadata recordMetadata,
Exception exception) {
log.error("error callback");
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import jakarta.annotation.Resource;

@Slf4j
@Configuration
public class CustomKafkaConf {

@Resource
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory;

@Resource
CustomProducerListener customProducerListener;

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
kafkaTemplate.setProducerListener(customProducerListener);
return kafkaTemplate;
}

}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
@Component
public class CustomKafkaListener implements ConsumerSeekAware {

public Boolean seekFlag = false;

private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

@KafkaListener(topics = "topic", groupId = "group", concurrency = "${consumer-concurrency:2}")
public void listenTestTopic(ConsumerRecord<String, String> record) {
log.error(record.key() + " " + record.value() + " " + record.partition() + " " + record.offset());
}

@KafkaListener(topics = "topic", groupId = "group", concurrency = "${consumer-concurrency:2}")
public void listenTestTopicAndCommit(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.error(record.key() + " " + record.value() + " " + record.partition() + " " + record.offset());
ack.acknowledge();
}

@KafkaListener(topics = "topic", groupId = "#{T(java.util.UUID).randomUUID().toString()}")
public void listenBroadcast(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.error(record.key() + " " + record.value() + " " + record.partition() + " " + record.offset());
ack.acknowledge();
}

@KafkaListener(topics = "topic", groupId = "group")
public void seekListener(ConsumerRecord<String, String> record) {
if (seekFlag) {
seekToOffset("topic", null, 0L);
this.seekFlag = false;
}
}

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.seekCallBack.set(callback);
}

public void seekToOffset(String topic, Integer partition, Long offset) {
if (partition == null) {
Map<String, TopicDescription> result = kafkaAdmin.describeTopics(topic);
TopicDescription topicDescription = result.get(topic);
List<TopicPartitionInfo> partitions = topicDescription.partitions();
for (TopicPartitionInfo topicPartitionInfo : partitions) {
this.seekCallBack.get().seek(topic, topicPartitionInfo.partition(), offset);
}
} else {
this.seekCallBack.get().seek(topic, partition, offset);
}
}

}

注: consumer-concurrency 可以配置 Consumer 的线程数。且 seek 操作需要在有数据消费时才能触发。

单元测试

在单元测试中可以加入 @EmbeddedKafka 注解进行单元测试,样例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = {
"listeners=PLAINTEXT://localhost:9092", "port=9092"
})
public class KafkaConsumerTest {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private KafkaConsumer kafkaConsumer;

@Test
public void testReceive() throws Exception {
String message = "Hello, world!";
kafkaTemplate.send("test-topic", message);
kafkaConsumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(kafkaConsumer.getLatch().getCount()).isEqualTo(0);
assertThat(kafkaConsumer.getPayload()).isEqualTo(message);
}
}

参考资料

官方文档


Spring for Apache Kafka
https://wangqian0306.github.io/2022/spring-for-apache-kafka/
作者
WangQian
发布于
2022年11月9日
许可协议