博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot集成kafka(consumer篇)
阅读量:4227 次
发布时间:2019-05-26

本文共 10920 字,大约阅读时间需要 36 分钟。

本篇文章为springboot集成kafka的消费者篇

 连接工厂和监听容器工厂

package com.te.factory;import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.serialization.StringDeserializer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.listener.AbstractMessageListenerContainer;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;import org.springframework.kafka.listener.KafkaMessageListenerContainer;import org.springframework.kafka.listener.ListenerExecutionFailedException;import org.springframework.kafka.listener.MessageListener;import org.springframework.kafka.listener.adapter.RecordFilterStrategy;import org.springframework.kafka.listener.config.ContainerProperties;import org.springframework.kafka.support.TopicPartitionInitialOffset;import org.springframework.messaging.Message;import com.te.controller.Listener;import com.te.handler.KafkaRecordFilterStrategy;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaConsumerConfig {    @Value("${kafka.consumer.servers}")    private String servers;    @Value("${kafka.consumer.enable.auto.commit}")//是否自动提交,建议为false    private boolean enableAutoCommit;    @Value("${kafka.consumer.session.timeout}")    private String sessionTimeout;    @Value("${kafka.consumer.auto.commit.interval}")    private String autoCommitInterval;    @Value("${kafka.consumer.group.id}")    private String groupId;    @Value("${kafka.consumer.auto.offset.reset}")    private String autoOffsetReset;    @Value("${kafka.consumer.concurrency}")    private int concurrency;        @Autowired    private KafkaRecordFilterStrategy kafkaRecordFilterStrategy;        /**     * 下面的工厂类可扩展的东西是真的多,spring也为rabbitMQ准备了一套     * @return     */    //此bean主要用于扩展监听器使用(监听容器工厂)\    //使用的时候可以直接在@KafkaListener(containerFactory	="bean的名字")即可和rabbiMQ一样    @Bean(name="kafkaListenerContainerFactory")    public KafkaListenerContainerFactory
> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //设置拉取等待时间(也可间接的理解为延时消费) factory.getContainerProperties().setPollTimeout(1500); //设置并发量,小于或等于Topic的分区数,并且要在consumerFactory设置一次拉取的数量 factory.setConcurrency(concurrency); //设置为批量监听 factory.setBatchListener(true); //指定使用此bean工厂的监听方法,消费确认为方式为用户指定aks,可以用下面的配置也可以直接使用enableAutoCommit参数 factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); //设置回复模板,类似于rabbitMQ的死信交换机,但是还有区别, // factory.setReplyTemplate(kafkaTemplate());//发送消息的模板,这里只是消费者的类,所以木有 //禁止自动启动,用于持久化操作,可先将消息都发送至broker,然后在固定的时间内进行持久化,有丢失消息的风险 factory.setAutoStartup(false); //使用过滤器 //配合RecordFilterStrategy使用,被过滤的信息将被丢弃 factory.setAckDiscarded(true); factory.setRecordFilterStrategy(kafkaRecordFilterStrategy); return factory; } @Bean //消费者工厂 public ConsumerFactory
consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map
consumerConfigs() { Map
propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); //一次拉取消息数量 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50"); return propsMap; } @Bean public Listener listener() { return new com.te.controller.Listener(); } /** * 自定义消费者,不用在去写方法,直接以bean的模式就行创建 * @return */ @Bean//此bean的作用相当于 @KafkaListener(id="consumerId", // topicPartitions={@TopicPartition(topic="test",partitions={"1"})}),不用再另起监听注解了 public KafkaMessageListenerContainer demoListenerContainer() { TopicPartitionInitialOffset topicPartitionInitialOffset =new TopicPartitionInitialOffset("topic.quick.bean",1); ContainerProperties properties = new ContainerProperties(topicPartitionInitialOffset); properties.setGroupId("bean"); properties.setMessageListener(new MessageListener
() { private Logger log = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(ConsumerRecord
record) { log.info("topic.quick.bean receive : " + record.toString()); } }); return new KafkaMessageListenerContainer(consumerFactory(), properties); } }

 

consumer异常处理器

package com.te.handler;import org.apache.kafka.clients.consumer.Consumer;import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;import org.springframework.kafka.listener.ListenerExecutionFailedException;import org.springframework.messaging.Message;/** * 消费者异常处理器(和rabbitMQ中的一样) * @author Administrator * */public class ConsumerAwareErrorHandler implements ConsumerAwareListenerErrorHandler{	@Override	public Object handleError(Message
message, ListenerExecutionFailedException exception, Consumer
consumer) { //异常处理类,此类可以做重试策略,当消费者出现异常的时候发送给其它topic下的分区 message.getHeaders();//可以得到你想要信息 System.out.println(message.getPayload()); return null; }}

消息过滤器

package com.te.handler;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.listener.adapter.RecordFilterStrategy;import org.springframework.stereotype.Component;@Componentpublic class KafkaRecordFilterStrategy implements RecordFilterStrategy {	@Override	public boolean filter(ConsumerRecord consumerRecord) {	//此类可以对即将消费的信息进行一些列的过滤    //比如写日志的时候,过滤掉一些日志不消费,也是可以的,但是不消费,那条消息就会被丢弃	//为true则丢弃消息		return false;	}}

消费者核心代码

package com.te.controller;import java.util.List;import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.kafka.config.KafkaListenerEndpointRegistry;import org.springframework.kafka.support.Acknowledgment;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.messaging.handler.annotation.SendTo;import lombok.extern.slf4j.Slf4j;/** * 同一个消费者组中的消费者不能指定同一分区 * @author 刘新杨 *   菩提本无树, *   明镜亦非台。 */@Slf4jpublic class Listener {	@Autowired    private KafkaListenerEndpointRegistry registry;	   private Logger log = LoggerFactory.getLogger(this.getClass());	 @KafkaListener(topics = {"test"},id="consumerId",errorHandler="consumerAwareErrorHandler")//id的值可为消费者组id	    public void listen(ConsumerRecord
record) { //errorHandler异常处理器注解里面使用的也是bean的名称// Log.info("kafka的key: " + record.key());0// Log.info("kafka的value: " + record.value().toString()); } //消费名称为test的topic中的第0个分区 @KafkaListener(id="consumerId", topicPartitions={@TopicPartition(topic="test",partitions={"0"}) // @TopicPartition(topic = "topic.quick.batch.partition",partitions = {"0","4"}也可指定多个分区 }) public void listen2(ConsumerRecord
record,Acknowledgment ack) { // Log.info("kafka的key: " + record.key());// Log.info("kafka的value: " + record.value().toString()); ack.acknowledge(); } /** * 下面消费者监听器的详细解释 * 可批量消费的,使用的时候要注意containerFactory */ @KafkaListener(id = "consumerId",topics = {"test.batch"},containerFactory = "kafkaListenerContainerFactory") public void batchListener(List
data,Acknowledgment ack,Consumer consumer) { log.info("topic.quick.batch receive : "); for (String s : data) { log.info( s); } ack.acknowledge();//调用了则表示确认消费,某个偏移量的消息 /** * 使用Consumer.seek方法,重新回到该未ack消息偏移量的位置重新消费, * 这种可能会导致死循环,原因出现于业务一直没办法处理这条数据, * 但还是不停的重新定位到该数据的偏移量上。 */ // consumer.seek(arg0, arg1); } /** * 获取消息的生产者中配置消息头参数 */ @KafkaListener(id = "anno", topics = "topic.quick.anno") public void annoListener(@Payload String data, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) { log.info("topic.quick.anno receive : \n"+ "data : "+data+"\n"+ "key : "+key+"\n"+ "partitionId : "+partition+"\n"+ "topic : "+topic+"\n"+ "timestamp : "+ts+"\n" ); } /** * 下面监听器的意思为topic.quick.target中的消息,消息完成之后,会把return里面的内容发送给topic.quick.real(topic) * 可以被其它消费者进行消费。可用于A执行完流程,调用B * @param data * @return */ @KafkaListener(id = "forward", topics = "topic.quick.target") @SendTo("topic.quick.real") public String forward(String data) { log.info("topic.quick.target forward "+data+" to topic.quick.real"); return "topic.quick.target send msg : " + data; } }

  老规矩关于  @KafkaListener注解我们详细解析一下这个注解

String id()  消费者唯一标识符id

String containerFactory()   容器工厂类代码有详细解释特别流弊

String[] topics()  需要监听topic以一个数组的方式来展示{"0","4"}

topicPattern  所属topic的分区需要提前设置topic,否则读默认topic可能会出现不存在分区异常·

TopicPartition[] topicPartitions() 内部可写 topic ,partition,offset。

String containerGroup()  参数为监听容器(实现此类编写自己逻辑MessageListenerContainer.class)

String groupId() default ""; 消费者组id,如果给值了,会覆盖创建工厂时候给的值。

 

        源码地址

                               

转载地址:http://bxbqi.baihongyu.com/

你可能感兴趣的文章
并查集——好朋友
查看>>
关键路径
查看>>
Web前端学习笔记——JavaScript之事件详解
查看>>
Web前端学习笔记——JavaScript之事件、创建元素、节点操作
查看>>
Web前端学习笔记——JavaScript之正则表达式、伪数组、垃圾回收
查看>>
Web前端学习笔记——JavaScript 之继承、函数进阶
查看>>
Web前端学习笔记——JavaScript之面向对象游戏案例:贪吃蛇
查看>>
不做单元测试?小心得不偿失!嵌入式系统单元测试工具,自动生成测试用例
查看>>
一种实用的联网汽车无线攻击方法及车载安全协议
查看>>
光靠欺骗检测是不够的:对抗多目标跟踪的攻击
查看>>
基于微区块链的V2X地理动态入侵检测
查看>>
面向V2C场景的ADAS数字孪生模型构建方法
查看>>
Comma2k19数据集使用
查看>>
面向自动驾驶车辆验证的抽象仿真场景生成
查看>>
一种应用于GPS反欺骗的基于MLE的RAIM改进方法
查看>>
自动驾驶汽车GPS系统数字孪生建模(一)
查看>>
自动驾驶汽车GPS系统数字孪生建模(二)
查看>>
CUDA 学习(五)、线程块
查看>>
CUDA 学习(八)、线程块调度
查看>>
CUDA 学习(九)、CUDA 内存
查看>>