本文共 10920 字,大约阅读时间需要 36 分钟。
本篇文章为springboot集成kafka的消费者篇
连接工厂和监听容器工厂
package com.te.factory;import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.serialization.StringDeserializer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.listener.AbstractMessageListenerContainer;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;import org.springframework.kafka.listener.KafkaMessageListenerContainer;import org.springframework.kafka.listener.ListenerExecutionFailedException;import org.springframework.kafka.listener.MessageListener;import org.springframework.kafka.listener.adapter.RecordFilterStrategy;import org.springframework.kafka.listener.config.ContainerProperties;import org.springframework.kafka.support.TopicPartitionInitialOffset;import org.springframework.messaging.Message;import com.te.controller.Listener;import com.te.handler.KafkaRecordFilterStrategy;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}")//是否自动提交,建议为false private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Autowired private KafkaRecordFilterStrategy kafkaRecordFilterStrategy; /** * 下面的工厂类可扩展的东西是真的多,spring也为rabbitMQ准备了一套 * @return */ //此bean主要用于扩展监听器使用(监听容器工厂)\ //使用的时候可以直接在@KafkaListener(containerFactory ="bean的名字")即可和rabbiMQ一样 @Bean(name="kafkaListenerContainerFactory") public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //设置拉取等待时间(也可间接的理解为延时消费) factory.getContainerProperties().setPollTimeout(1500); //设置并发量,小于或等于Topic的分区数,并且要在consumerFactory设置一次拉取的数量 factory.setConcurrency(concurrency); //设置为批量监听 factory.setBatchListener(true); //指定使用此bean工厂的监听方法,消费确认为方式为用户指定aks,可以用下面的配置也可以直接使用enableAutoCommit参数 factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); //设置回复模板,类似于rabbitMQ的死信交换机,但是还有区别, // factory.setReplyTemplate(kafkaTemplate());//发送消息的模板,这里只是消费者的类,所以木有 //禁止自动启动,用于持久化操作,可先将消息都发送至broker,然后在固定的时间内进行持久化,有丢失消息的风险 factory.setAutoStartup(false); //使用过滤器 //配合RecordFilterStrategy使用,被过滤的信息将被丢弃 factory.setAckDiscarded(true); factory.setRecordFilterStrategy(kafkaRecordFilterStrategy); return factory; } @Bean //消费者工厂 public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map consumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); //一次拉取消息数量 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50"); return propsMap; } @Bean public Listener listener() { return new com.te.controller.Listener(); } /** * 自定义消费者,不用在去写方法,直接以bean的模式就行创建 * @return */ @Bean//此bean的作用相当于 @KafkaListener(id="consumerId", // topicPartitions={@TopicPartition(topic="test",partitions={"1"})}),不用再另起监听注解了 public KafkaMessageListenerContainer demoListenerContainer() { TopicPartitionInitialOffset topicPartitionInitialOffset =new TopicPartitionInitialOffset("topic.quick.bean",1); ContainerProperties properties = new ContainerProperties(topicPartitionInitialOffset); properties.setGroupId("bean"); properties.setMessageListener(new MessageListener () { private Logger log = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(ConsumerRecord record) { log.info("topic.quick.bean receive : " + record.toString()); } }); return new KafkaMessageListenerContainer(consumerFactory(), properties); } }
consumer异常处理器
package com.te.handler;import org.apache.kafka.clients.consumer.Consumer;import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;import org.springframework.kafka.listener.ListenerExecutionFailedException;import org.springframework.messaging.Message;/** * 消费者异常处理器(和rabbitMQ中的一样) * @author Administrator * */public class ConsumerAwareErrorHandler implements ConsumerAwareListenerErrorHandler{ @Override public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { //异常处理类,此类可以做重试策略,当消费者出现异常的时候发送给其它topic下的分区 message.getHeaders();//可以得到你想要信息 System.out.println(message.getPayload()); return null; }}
消息过滤器
package com.te.handler;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.listener.adapter.RecordFilterStrategy;import org.springframework.stereotype.Component;@Componentpublic class KafkaRecordFilterStrategy implements RecordFilterStrategy { @Override public boolean filter(ConsumerRecord consumerRecord) { //此类可以对即将消费的信息进行一些列的过滤 //比如写日志的时候,过滤掉一些日志不消费,也是可以的,但是不消费,那条消息就会被丢弃 //为true则丢弃消息 return false; }}
消费者核心代码
package com.te.controller;import java.util.List;import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.kafka.config.KafkaListenerEndpointRegistry;import org.springframework.kafka.support.Acknowledgment;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.messaging.handler.annotation.SendTo;import lombok.extern.slf4j.Slf4j;/** * 同一个消费者组中的消费者不能指定同一分区 * @author 刘新杨 * 菩提本无树, * 明镜亦非台。 */@Slf4jpublic class Listener { @Autowired private KafkaListenerEndpointRegistry registry; private Logger log = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics = {"test"},id="consumerId",errorHandler="consumerAwareErrorHandler")//id的值可为消费者组id public void listen(ConsumerRecord record) { //errorHandler异常处理器注解里面使用的也是bean的名称// Log.info("kafka的key: " + record.key());0// Log.info("kafka的value: " + record.value().toString()); } //消费名称为test的topic中的第0个分区 @KafkaListener(id="consumerId", topicPartitions={@TopicPartition(topic="test",partitions={"0"}) // @TopicPartition(topic = "topic.quick.batch.partition",partitions = {"0","4"}也可指定多个分区 }) public void listen2(ConsumerRecord record,Acknowledgment ack) { // Log.info("kafka的key: " + record.key());// Log.info("kafka的value: " + record.value().toString()); ack.acknowledge(); } /** * 下面消费者监听器的详细解释 * 可批量消费的,使用的时候要注意containerFactory */ @KafkaListener(id = "consumerId",topics = {"test.batch"},containerFactory = "kafkaListenerContainerFactory") public void batchListener(Listdata,Acknowledgment ack,Consumer consumer) { log.info("topic.quick.batch receive : "); for (String s : data) { log.info( s); } ack.acknowledge();//调用了则表示确认消费,某个偏移量的消息 /** * 使用Consumer.seek方法,重新回到该未ack消息偏移量的位置重新消费, * 这种可能会导致死循环,原因出现于业务一直没办法处理这条数据, * 但还是不停的重新定位到该数据的偏移量上。 */ // consumer.seek(arg0, arg1); } /** * 获取消息的生产者中配置消息头参数 */ @KafkaListener(id = "anno", topics = "topic.quick.anno") public void annoListener(@Payload String data, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) { log.info("topic.quick.anno receive : \n"+ "data : "+data+"\n"+ "key : "+key+"\n"+ "partitionId : "+partition+"\n"+ "topic : "+topic+"\n"+ "timestamp : "+ts+"\n" ); } /** * 下面监听器的意思为topic.quick.target中的消息,消息完成之后,会把return里面的内容发送给topic.quick.real(topic) * 可以被其它消费者进行消费。可用于A执行完流程,调用B * @param data * @return */ @KafkaListener(id = "forward", topics = "topic.quick.target") @SendTo("topic.quick.real") public String forward(String data) { log.info("topic.quick.target forward "+data+" to topic.quick.real"); return "topic.quick.target send msg : " + data; } }
老规矩关于 @KafkaListener注解我们详细解析一下这个注解
String id() 消费者唯一标识符id
String containerFactory() 容器工厂类代码有详细解释特别流弊
String[] topics() 需要监听topic以一个数组的方式来展示{"0","4"}
topicPattern 所属topic的分区需要提前设置topic,否则读默认topic可能会出现不存在分区异常·
TopicPartition[] topicPartitions() 内部可写 topic ,partition,offset。
String containerGroup() 参数为监听容器(实现此类编写自己逻辑MessageListenerContainer.class)
String groupId() default ""; 消费者组id,如果给值了,会覆盖创建工厂时候给的值。
源码地址
转载地址:http://bxbqi.baihongyu.com/