springboot + kafka 入门实例 入门demo

java1234

共 4031字,需浏览 9分钟

 ·

2020-08-25 19:43

点击上方蓝色字体,选择“标星公众号”

优质文章,第一时间送达

  作者 |   jelly_oy 

来源 |  urlify.cn/NVJFva        

66套java从入门到精通实战课程分享 

版本说明

  • springboot版本:2.3.3.RELEASE

  • kakfa服务端版本:kafka_2.12-2.6.0.tgz

  • zookeeper服务端版本:apache-zookeeper-3.6.1-bin.tar.gz

实例搭建前提条件

1,搭建好zookeeper服务,本实例zookeeper使用单机伪集群模式,

192.168.1.126:2181, 192.168.1.126:2182, 192.168.1.126:2183

2,搭建好kafka服务,本实例kafka使用单机伪集群模式,

192.168.1.126:9092, 192.168.1.126:9093, 192.168.1.126:9094

1. 导入相关依赖

xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0modelVersion>

    <parent>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-parentartifactId>
        <version>2.3.3.RELEASEversion>
        <relativePath/> 
    parent>

    <groupId>com.examplegroupId>
    <artifactId>springboot-kafka-demoartifactId>
    <version>1.0-SNAPSHOTversion>
    <name>springboot-kafka-demoname>
    <description>springboot-kafka-demodescription>

    <properties>
        <java.version>1.8java.version>
    properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-webartifactId>
        dependency>

        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-testartifactId>
            <scope>testscope>
        dependency>

        <dependency>
            <groupId>org.springframework.kafkagroupId>
            <artifactId>spring-kafkaartifactId>
        dependency>

        <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
            <optional>trueoptional>
        dependency>

        <dependency>
            <groupId>com.alibabagroupId>
            <artifactId>fastjsonartifactId>
            <version>1.2.54version>
        dependency>
    dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-maven-pluginartifactId>
            plugin>
        plugins>
    build>

project>

2. yml配置

server:
  port: 8080
  servlet:
    context-path: /
  tomcat:
    uri-encoding: UTF-8

spring:
  kafka:
    #本地虚拟机kafka伪集群
    bootstrap-servers: 192.168.1.126:9092,192.168.1.126:9093,192.168.1.126:9094
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 65536
      buffer-memory: 524288
      #自定义的topic
      myTopic1: testTopic1
      myTopic2: testTopic2
    consumer:
      group-id: default-group #默认组id 后面会配置多个消费者组
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest
      enable-auto-commit: false #关闭自动提交 改由spring-kafka提交
      auto-commit-interval: 100
      max-poll-records: 20      #批量消费 一次接收的最大数量

3. 部分代码

消息实体类

package com.example.demo.entity;

import java.util.Date;
import lombok.Data;
import lombok.ToString;

@Data
@ToString
public class Message {
    private Long id;
    private String msg;
    private Date sendTime;

}

kafka配置类

package com.example.demo.config;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

/**
 * kafka配置类
 */

@Data
@Configuration
public class KafkaConfiguration {
    /**
     * kafaka集群列表
     */

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
 
    /**
     * kafaka消费group列表
     */

    @Value("${spring.kafka.consumer.group-id}")
    private String defaultGroupId;
    
    /**
     * 消费开始位置
     */

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
 
    /**
     * 是否自动提交
     */

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
 
    /**
     * #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
     */

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
 
    /**
     * 一次调用poll()操作时返回的最大记录数,默认值为500
     */

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    /**
     * 自定义的topic1
     */

    @Value("${spring.kafka.producer.myTopic1}")
    private String myTopic1;

    /**
     * 自定义的topic2
     */

    @Value("${spring.kafka.producer.myTopic2}")
    private String myTopic2;
 
}

消费者监听类

package com.example.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * 消费者1(监听topic1队列)
 */

@Component
public class ConsumerListener1 {

    @KafkaListener(topics = "${spring.kafka.producer.myTopic1}")
    public void listen(ConsumerRecord record) {
        System.out.println(record);
        String value = record.value();
        System.out.println("消费者1接收到消息:" + value);
    }
}

测试类

package com.example.demo.controller;

import com.alibaba.fastjson.JSON;
import com.example.demo.config.KafkaConfiguration;
import com.example.demo.entity.Message;
import com.example.demo.service.KafkaService;
import com.example.demo.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {
    @Autowired
    private KafkaService kafkaService;

    @Autowired
    private KafkaConfiguration kafkaConfiguration;

    /**
     * 发送文本消息
     * @param msg
     * @return
     */

    @GetMapping("/send/{msg}")
    public String send(@PathVariable String msg) {
        kafkaService.send(kafkaConfiguration.getMyTopic1(), msg);
        return "生产者发送消息给topic1:"+msg;
    }

    /**
     * 发送JSON数据
     * @return
     */

    @GetMapping("/send2")
    public String send2() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg("生产者发送消息到topic1: " + UUID.getUUID32());
        message.setSendTime(new Date());

        String value = JSON.toJSONString(message);
        log.info("生产者发送消息到topic1 message = {}", value);

        kafkaService.send(kafkaConfiguration.getMyTopic1(),value);
        return value;
    }

    /**
     * 发送JSON数据
     * @return
     */

    @GetMapping("/send3")
    public String send3() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg("生产者发送消息到topic2: " + UUID.getUUID32());
        message.setSendTime(new Date());

        String value = JSON.toJSONString(message);
        log.info("生产者发送消息到topic2 message = {}", value);

        kafkaService.send(kafkaConfiguration.getMyTopic2(),value);
        return value;
    }

}

4. 实例运行结果



5. 写在最后

本实例源代码:https://gitee.com/jelly_oy/springboot-kafka-demo

本实例采用springboot2.3.3 + zookeeper3.6.1 + kafka2.6.0 进行搭建

如果本项目对你有帮助,欢迎留言评论,欢迎git clone源代码。



粉丝福利:108本java从入门到大神精选电子书领取

???

?长按上方锋哥微信二维码 2 秒
备注「1234」即可获取资料以及
可以进入java1234官方微信群



感谢点赞支持下哈 

浏览 34
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报