博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot集成kafka(producer篇)
阅读量:4226 次
发布时间:2019-05-26

本文共 12218 字,大约阅读时间需要 40 分钟。

前面两篇文章大致上了讲了kafka的一些原理 topic,producer,consumer offset,partition,consumer group的一些概念。下面直接上代码 springboot和kafka的集成

maven配置类

 <parent>

        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/>
    </parent>

  

  <dependencies>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.6</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->

  <dependency>

            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>

            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>     
  </dependencies>

application.properties

#============== kafka ===================
kafka.consumer.zookeeper.connect=127.0.0.1:2181
kafka.consumer.servers=127.0.0.1:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
#表示自动从最新的开始消费
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10

kafka.producer.servers=127.0.0.1:9092

kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

topic和分区创建类(主要用来创建topic和定义kafkaAdmin)

package com.te.topic;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.AdminClientConfig;import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.core.KafkaAdmin;import org.springframework.kafka.core.KafkaTemplate;/** * 创建自定义topic加上分区的 * 因为使用模板类.send()的方法只会创建一个分区,无法达到最优的吞吐量  * @author 刘新杨 *   菩提本无树, *   明镜亦非台。 */@Configurationpublic class KafkaInitialConfiguration {    //创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1    @Bean//通过bean创建(bean的名字为initialTopic)    public NewTopic initialTopic() {        return new NewTopic("topic.quick.initial",8, (short) 1 );    }    /**     * 此种@Bean的方式,如果topic的名字相同,那么会覆盖以前的那个     * @return     *///    //修改后|分区数量会变成11个 注意分区数量只能增加不能减少    @Bean    public NewTopic initialTopic2() {        return new NewTopic("topic.quick.initial",11, (short) 1 );    }    @Bean //创建一个kafka管理类,相当于rabbitMQ的管理类rabbitAdmin,没有此bean无法自定义的使用adminClient创建topic    public KafkaAdmin kafkaAdmin() {        Map
props = new HashMap<>(); //配置Kafka实例的连接地址 //kafka的地址,不是zookeeper props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); KafkaAdmin admin = new KafkaAdmin(props); return admin; } @Bean //kafka客户端,在spring中创建这个bean之后可以注入并且创建topic public AdminClient adminClient() { return AdminClient.create(kafkaAdmin().getConfig()); } }

 下面的类需要依赖于上面的adminClient实现手动创建topic

public class Snippet {	@Autowired // adminClien需要自己生成配置bean	private AdminClient adminClient;		    @Resource    private KafkaTemplate defaultKafkaTemplate;	@Test//自定义手动创建topic和分区	public void testCreateTopic() throws InterruptedException {		// 这种是手动创建 //10个分区,一个副本		// 分区多的好处是能快速的处理并发量,但是也要根据机器的配置		NewTopic topic = new NewTopic("topic.quick.initial2", 10, (short) 1);		adminClient.createTopics(Arrays.asList(topic));		Thread.sleep(1000);	}		@Test//和rabbitMQ的类似	public void testDefaultKafka(){		//前提是要在创建模板类的时候指定topic,否则回报找不到topic		defaultKafkaTemplate.setDefaultTopic("这里发送的消息");					}

生产者拦截器

package com.te.handler;import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaProducerInterceptor  implements ProducerInterceptor
{ @Override public void configure(Map
configs) { // TODO Auto-generated method stub } @Override public ProducerRecord
onSend(ProducerRecord
record) { // TODO Auto-generated method stub /** * 可以在此方法中定义对消息做一些特殊处理 */ return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // TODO Auto-generated method stub /** * 该方法会在消息被应答之前或消息发送失败的时候调用,也可以在此做重试操作 * */ metadata.partition(); metadata.topic(); metadata.serializedKeySize(); metadata.serializedValueSize(); } @Override public void close() { // TODO Auto-generated method stub }}

发送回调类可以实现和类似rabbitMQ的确认到达机制

package com.te.handler;import org.apache.kafka.clients.producer.RecordMetadata;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.support.ProducerListener;import org.springframework.stereotype.Service;/** * kafka的回调类,可以在此类中定义producer发送消息失败时候的回调方法 * @author 刘新杨 *   菩提本无树, *   明镜亦非台。 */@Servicepublic class KafkaSendResultHandler implements ProducerListener {    private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);	@Override	public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {		// TODO Auto-generated method stub		log.info("消息发送成功");	}	@Override	public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {		// TODO Auto-generated method stub		//可重试		System.out.println("消息发送失败");			}	@Override	public boolean isInterestedInSuccess() {		// TODO Auto-generated method stub		return false;	}}

kafka生成工厂类(我是自己创建的,当前也可以使用springboot自动配置类。KafkaProperties.class)

package com.te.factory;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;@Configuration@EnableKafkapublic class KafkaProducerConfig {    @Value("${kafka.producer.servers}")    private String servers;    @Value("${kafka.producer.retries}")    private int retries;    @Value("${kafka.producer.batch.size}")    private int batchSize;    @Value("${kafka.producer.linger}")    private int linger;    @Value("${kafka.producer.buffer.memory}")    private int bufferMemory;    public Map
producerConfigs() { Map
props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); //设置重试次数 props.put(ProducerConfig.RETRIES_CONFIG, retries); //达到batchSize大小的时候会发送消息 props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); //延时时间,延时时间到达之后计算批量发送的大小没达到也发送消息 props.put(ProducerConfig.LINGER_MS_CONFIG, linger); //缓冲区的值 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); //序列化手段 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //producer端的消息确认机制,-1和all都表示消息不仅要写入本地的leader中还要写入对应的副本中 props.put(ProducerConfig.ACKS_CONFIG, -1); //单条消息的最大值以字节为单位,默认值为1048576 props.put(ProducerConfig.LINGER_MS_CONFIG, 10485760); //设置broker响应时间,如果broker在60秒之内还是没有返回给producer确认消息,则认为发送失败 props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); //指定拦截器(value为对应的class) props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.te.handler.KafkaProducerInterceptor"); //设置压缩算法(默认是木有压缩算法的) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "LZ4"); return props; } @Bean //生产者工厂 public ProducerFactory
producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean //模板发送消息 public KafkaTemplate
kafkaTemplate() { return new KafkaTemplate
(producerFactory()); } /** * 此模板已经设置了topic的名称,使用的时候可以直接注入此bean然后调用setDefaultTopic方法 * @return */ @Bean("defaultKafkaTemplate") public KafkaTemplate
defaultKafkaTemplate() { KafkaTemplate template = new KafkaTemplate
(producerFactory()); template.setDefaultTopic("topic.quick.default"); return template; }}

测试发送类,代码中有详细解释这里不做介绍

package com.te.controller;import org.apache.kafka.clients.producer.ProducerRecord;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.GenericMessage;import org.springframework.web.bind.annotation.*;import com.te.handler.KafkaSendResultHandler;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ExecutionException;import javax.annotation.Resource;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;@RestController@RequestMapping("/kafka")public class CollectController {    protected final Logger logger = LoggerFactory.getLogger(this.getClass());    @Autowired    private KafkaTemplate kafkaTemplate;    @Autowired    private KafkaSendResultHandler producerListener;       @Test    public void testProducerListen() throws InterruptedException {    	//设置发送此消息的 时候使用监听器,使用了监听器,那么就需要使用线程睡眠(原因如下)    	//否则发送时间较长的时候会导致进程提前关闭导致无法调用回调时间    	//因为KafkaTemplate发送消息是采取异步方式发送的,        kafkaTemplate.setProducerListener(producerListener);        kafkaTemplate.send("topic.quick.demo", "test producer listen");        Thread.sleep(1000);    }    @Test//如果你不想异步发送消息,那么可以使用下面的方法来同步发送    //直接获取结果不进入等待    public void syncSendMessage() throws InterruptedException, ExecutionException{    	  kafkaTemplate.setProducerListener(producerListener);    	  //.get()方法提供了两个参数   long timeout, TimeUnit unit    	  //但是当发送的时间,大于超时时间的时候,就会报异常          kafkaTemplate.send("topic.quick.demo", "test producer listen").get();    }    @RequestMapping(value = "/send", method = RequestMethod.GET)    public void sendKafka(HttpServletRequest request, HttpServletResponse response) {        try {            String message = request.getParameter("message");            logger.info("kafka的消息={}", message);            //此处也可以发送到指定分区            kafkaTemplate.send("test", "key", message);                        //发送带有时间戳的消息//            kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");////            //使用ProducerRecord发送消息(此处也可以指定topic,partition,key , value)            //反正无论你传你什么参数,底层都是用ProducerRecord参数来进行发送的            ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");//          ProducerRecord(String topic, Integer partition, K key, V value) {            kafkaTemplate.send(record);                      //使用Message发送消息,这里自定义一些其他参数,发送的时候使用的是MessageHeaders            Map map = new HashMap();            map.put(KafkaHeaders.TOPIC, "topic.quick.demo");            map.put(KafkaHeaders.PARTITION_ID, 0);            map.put(KafkaHeaders.MESSAGE_KEY, 0);            GenericMessage message3 = new GenericMessage("use Message to send message",new MessageHeaders(map));            kafkaTemplate.send(message3);                       logger.info("发送kafka成功.");            //return new Response(ResultCode.SUCCESS, "发送kafka成功", null);        } catch (Exception e) {            logger.error("发送kafka失败", e);           // return new Response(ResultCode.EXCEPTION, "发送kafka失败", null);        }    }}

                              源码地址

                                        

转载地址:http://fxbqi.baihongyu.com/

你可能感兴趣的文章
Linux驱动开发学习的简单步骤
查看>>
LINUX下编程工具常见用法小结
查看>>
驱动中位域操作简析
查看>>
Linux 引导过程内幕
查看>>
十年学会程序设计
查看>>
GPS NMEA-0183协议详解
查看>>
Rational 最新软件试用下载地址
查看>>
正则表达式30分钟入门教程
查看>>
Build the Hack CPU with Verilog -- 陈硕
查看>>
《计算机系统要素》配套软件和资料下载
查看>>
u-boot 启动过程 —— 基于S3C2410 --转载自周明
查看>>
U-boot大全
查看>>
怎样开公司
查看>>
灵活使用Linux下的glimpse工具和tee命令
查看>>
介绍Linux下经典的文件传输工具: lrzsz
查看>>
活用AXD Debugger调试器的命令行,实现u-boot的源代码级调试
查看>>
程序员的一份礼物-常用工具集
查看>>
uClinux 启动过程详细分析
查看>>
嵌入式系统 Boot Loader 技术内幕
查看>>
通信协议的一些技巧
查看>>