首页
  • Java
  • Linux
  • Deploy
  • Application

Xiao ku

板砖师傅
首页
  • Java
  • Linux
  • Deploy
  • Application
  • Springboot集成redis
  • Springboot集成mongodb
  • Springboot集成FastDFS
  • Springboot集成WebSocket
  • Springboot集成kafka
    • 工程接入
      • pom文件
      • yml文件
      • JAVA文件
      • kafka消费者配置文件
      • kafka监听消费入口
    • 集成KAFKA权限
      • 在kafka消费者配置文件增加配置
      • 在resources下增加文件kafkaclientjaas.conf
      • 启动类增加setProperty
    • 常用代码
      • 发送实体类
      • 消费实体类
      • 监听topic使用配置文件常量
      • yml配置文件
      • JAVA代码
    • 部署KAFKA
      • Docker-单机
      • 宿主机-单机
      • 上传压缩包并解压
      • 修改配置并启动
      • 创建topic
      • 配置SASL
      • 服务端配置
      • 修改server.properties
      • 创建账号
      • 修改kafka-run-class.sh
      • 客户端配置账号
      • 新建文件
      • 修改客户端脚本
      • 增加用户配置
      • 重启
      • 设置用户权限
      • 测试
      • 清除Topic下数据
  • Springboot集成Flyway
  • Springboot集成ScheduleTask
  • Validation数据校验规范使用
  • 常用代码
  • Java
xiaoku
2023-03-13
目录

Springboot集成kafka

# 工程接入

kafka批量消费

# pom文件

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
1
2
3
4

# yml文件

server:
  servlet:
    context-path: /imdemosc
  port: 8610
spring: 
  application:
    name: imdemosc
  kafka:
    bootstrap-servers: 192.168.213.215:9092,192.168.213.215:9091,192.168.213.215:9093
    consumer:
      enable-auto-commit: true
      group-id: dsaim105
      # 批量一次最大拉取数据量
      max-poll-records: 100
      auto-commit-interval: 100
      auto-offset-reset: latest
      bootstrap-servers: 192.168.213.215:9092,192.168.213.215:9091,192.168.213.215:9093
    producer:
      # 重试次数
      retries: 3
      # 批量发送的消息数量
      batch-size: 100
      # 32MB的批处理缓冲区
      buffer-memory: 33554432
      bootstrap-servers: 192.168.213.215:9092,192.168.213.215:9091,192.168.213.215:9093
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# JAVA文件

# kafka消费者配置文件

KafkaConsumerConfig

package ok96.cn;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String consumerBootstrapServers;

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String producerBootstrapServers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.producer.retries}")
    private Integer retries;

    @Value("${spring.kafka.producer.batch-size}")
    private Integer batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private Integer bufferMemory;

    /**
     *  生产者配置信息
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.ACKS_CONFIG, "0");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     *  生产者工厂
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     *  生产者模板
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     *  消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /**
     *  消费者批量工程
     */
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setConcurrency(1);
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1000);
        
        return factory;
    }

//    @Bean
//    public KafkaConsumerListener listener(){
//        return new KafkaConsumerListener();
//    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

# kafka监听消费入口

package ok96.cn;

import static java.util.concurrent.Executors.newCachedThreadPool;

import java.util.List;
import java.util.concurrent.ExecutorService;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;




@Component
public class KafkaBatchListener {

    @KafkaListener(topics = "imtest6", containerFactory = "batchFactory")
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        System.out.println("我进来啦");
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 集成KAFKA权限

SASL_PLAINTEXT

# 在kafka消费者配置文件增加配置

在producerConfigs方法和consumerConfigs方法中增加

props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
1
2

# 在resources下增加文件kafka_client_jaas.conf

文件内容username和password为kafka中设置的权限账号密码

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="ok96"
  password="ok96-password";
};
1
2
3
4
5

# 启动类增加setProperty

@SpringBootApplication
public class SpringbootApp {
    public static void main(String[] args) {
        String dirname = "classpath:kafka_client_jaas.conf";
        System.setProperty("java.security.auth.login.config", dirname);
        SpringApplication.run(SpringbootApp.class, args);
    }
}
1
2
3
4
5
6
7
8

# 常用代码

# 发送实体类

也就是先将XXXX实体类转换成String

ObjectMapper mapper2 = new ObjectMapper();
kafkaTemplate.send(topic, mapper2.writeValueAsString(XXXXX));
1
2

# 消费实体类

也就是将String转换成XXXX实体类,可以在getTypeReference方法修改为静态方法,增加static
修改KafkaBatchListener

package cn.ok96;

import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaBatchListener {

    public TypeReference<?> getTypeReferenceTemp() {
        return new TypeReference<XXXX>() {
        };
    }
    
    @KafkaListener(topics = "imtest6", containerFactory = "batchFactory")
    public void listenPartition1(List<ConsumerRecord<String,String>> records) {
        for (ConsumerRecord<String, String> record : records) {
            Optional<String> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                //消息接收正常
                ObjectMapper mapper2 = new ObjectMapper();
                XXXX xxxx = (XXXX) mapper2.readValue(record.value(), getTypeReferenceTemp());
            }
        }
        
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 监听topic使用配置文件常量

# yml配置文件

kafkaSelf:
  topic: test
1
2

# JAVA代码

@KafkaListener(topics = "#{'${kafkaSelf.topic}'}", containerFactory = "batchFactory")
1

# 部署KAFKA

服务器IP:192.168.213.200

# Docker-单机

docker-compose.yml,连接地址192.168.213.200:19092

version: '3'
services:
  kafka-zookeeper:
    image: openzipkin/zipkin-kafka:2.11.12
    restart: always
    container_name: kafka-zookeeper
    ports:
      - 2181:2181
      - 9092:9092
      - 19092:19092
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=192.168.213.200
1
2
3
4
5
6
7
8
9
10
11
12

命令行

docker run -d --restart=always --name kafka-zookeeper -p 2181:2181 -p 9092:9092 -p 19092:19092  --env KAFKA_ADVERTISED_HOST_NAME=192.168.213.200  openzipkin/zipkin-kafka:2.11.12
1

部署成功测试

1.开启两个终端进入kafka
docker exec -it kafka-zookeeper /bin/sh
2.分别在终端容器内部执行
终端1
unset JMX_PORT;bin/kafka-console-producer.sh --broker-list 192.168.213.200:19092 --topic test9999
终端2
unset JMX_PORT;bin/kafka-console-consumer.sh --bootstrap-server 192.168.213.200:19092 --topic test9999 --from-beginning
1
2
3
4
5
6
7

# 宿主机-单机

服务器ip:192.168.213.203
连接地址:192.168.213.203:9092

# 上传压缩包并解压

上传kafka_2.12-2.3.0.tgz到/data/目录下
解压

tar -zvxf kafka_2.12-2.3.0.tgz
1

# 修改配置并启动

修改server.properties

vi /data/kafka_2.12-2.3.0/config/server.properties
1

增加以下内容,保存并退出

listeners=PLAINTEXT://192.168.213.203:9092
1

note

启动

cd /data/kafka_2.12-2.3.0
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon  config/server.properties
1
2
3

查看日志

tail -f /data/kafka_2.12-2.3.0/logs/zookeeper.out
tail -f /data/kafka_2.12-2.3.0/logs/kafkaServer.out
1
2

# 创建topic

创建topic:tpc.test001

/data/kafka_2.12-2.3.0/bin/kafka-topics.sh --create --zookeeper 192.168.213.203:2181 --replication-factor 1 --partitions 1 --topic tpc.test001
1

# 配置SASL

# 服务端配置

# 修改server.properties
vi /data/kafka_2.12-2.3.0/config/server.properties
1

修改/增加以下内容,将原来写的listeners用#注释

listeners=SASL_PLAINTEXT://192.168.213.203:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:admin
1
2
3
4
5
6
7

note

# 创建账号
vi /data/kafka_2.12-2.3.0/config/kafka_server_jaas.conf
1

增加以下内容,保存并退出

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-pwd"
user_admin="admin-pwd"
user_wuxiaoku="xiaoku-lzx22247a";
};
1
2
3
4
5
6
7

管理员账号:admin 密码 admin-pwd

普通用户账号:wuxiaoku 密码 xiaoku-lzx22247a note

# 修改kafka-run-class.sh

添加java.security.auth.login.config环境变量

vi /data/kafka_2.12-2.3.0/bin/kafka-run-class.sh
1

增加

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/data/kafka_2.12-2.3.0/config/kafka_server_jaas.conf'
1

增加

$KAFKA_SASL_OPTS
1

note

# 客户端配置账号

# 新建文件
vi /data/kafka_2.12-2.3.0/config/kafka_client_jaas.conf
1

新增以下内容

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="wuxiaoku"
  password="xiaoku-lzx22247a";
};
1
2
3
4
5

note

# 修改客户端脚本

消费者:

vi /data/kafka_2.12-2.3.0/config/consumer.properties
1

最后一行加上如下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
1
2

note 生产者:

vi /data/kafka_2.12-2.3.0/config/producer.properties
1

最后一行加上如下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
1
2

note

# 增加用户配置

消费者:

vi /data/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh
1

增加以下内容

export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"
1

note

生产者:

vi /data/kafka_2.12-2.3.0/bin/kafka-console-producer.sh
1

增加以下内容

export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"
1

note

# 重启

关闭命令:

cd /data/kafka_2.12-2.3.0
bin/kafka-server-stop.sh config/server.properties
bin/zookeeper-server-stop.sh config/zookeeper.properties
1
2
3

启动:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon  config/server.properties
1
2
# 设置用户权限

可百度 kfaka ACL

赋权:add 移除:remove 读:Read 写:Write

设置白名单:--allow-host 192.168.213.200

设置组权限:--group 设置topic权限--topic

对wuxiaoku用户的组test-consumer-group设置所有权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.213.203:2181 --add --allow-principal User:wuxiaoku --operation All --group test-consumer-group
1

对wuxiaoku用户赋权topic读权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.213.203:2181 --add  --allow-principal User:wuxiaoku  --allow-host '*' --operation Read --topic tpc.test001
1

对wuxiaoku用户移除topic读权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.213.203:2181 --remove  --allow-principal User:wuxiaoku  --allow-host '*' --operation Read --topic tpc.test001
1

对wuxiaoku用户赋权topic写权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.213.203:2181 --add  --allow-principal User:wuxiaoku  --allow-host '*' --operation Write --topic tpc.test001
1

对wuxiaoku用户移除topic写权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.213.203:2181 --remove  --allow-principal User:wuxiaoku  --allow-host '*' --operation Write --topic tpc.test001
1

查看权限

bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=192.168.213.203:2181
1
# 测试

开启两个终端

1终端消费者执行命令:

/data/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.213.203:9092 --topic  tpc.test001  --from-beginning --consumer.config /data/kafka_2.12-2.3.0/config/consumer.properties
1

2终端生产者执行命令:

/data/kafka_2.12-2.3.0/bin/kafka-console-producer.sh --broker-list 192.168.213.203:9092 --topic tpc.test001  --producer.config /data/kafka_2.12-2.3.0/config/producer.properties
1

在生产者终端发送随机字符串,在消费者终端能接收数据即正常 note

# 清除Topic下数据

例topic:cardata

1.修改保留时间为10秒
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name cardata --alter --add-config retention.ms=10000
2.查看topic策略
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --describe --entity-type topics --entity-name cardata
3.等待数据清理后,删除策略
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name cardata --alter --delete-config retention.ms
4.再次查看策略
1
2
3
4
5
6
7
上次更新: 2023/06/17, 09:00:35
Springboot集成WebSocket
Springboot集成Flyway

← Springboot集成WebSocket Springboot集成Flyway→

Copyright © 2019-2024 | 闽ICP备20012188号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式