国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 網站 > 幫助中心 > 正文

spring boot整合kafka過程解析

2024-07-09 22:43:20
字體:
來源:轉載
供稿:網友

這篇文章主要介紹了spring boot整合kafka過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

一、啟動kafka

  啟動kafka之前一定要啟動zookeeper,因為要使用kafka必須要使用zookeeper。

  windows環境下啟動,直接使用kafka自帶的zookeeper:

  E:/kafka_2.12-2.4.0/bin/windows zookeeper-server-start.bat ../../config/zookeeper.properties

  接下來啟動kafka

  E:/kafka_2.12-2.4.0/bin/windows kafka-server-start.bat ../../config/server.properties

二、spring boot整合kafka項目實例

1.導入的maven

<dependency>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>    </dependency>

配置文件:

server.port=80#kafka地址,可以有多個spring.kafka.bootstrap-servers=127.0.0.1:9092#------生產者配置文件---------#指定kafka消息體和key的編碼格式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#設置等待acks返回的機制,有三個值# 0:不等待返回的acks(可能會丟數據,因為發送消息沒有了失敗重試機制,但是這是最低延遲)# 1:消息發送給kafka分區中的leader后就返回(如果follower沒有同步完成leader就宕機了,就會丟數據)# -1(默認):等待所有follower同步完消息后再發送(絕對不會丟數據)spring.kafka.producer.acks=-1# 消息累計到batch-size的值后,才會發送消息,默認為16384spring.kafka.producer.batch-size=16384#如果kafka遲遲不發送消息(這里指的是消息沒堆積到指定數量),那么過了這個時間(單位:毫米)開始發送spring.kafka.producer.properties.linger.ms=1#設置緩沖區大小,默認是33554432#這個緩沖區是kafka中兩個線程里的共享變量#這個兩個線程是main和sender,main負責把消息發送到共享變量,sender從共享變量拉數據spring.kafka.producer.buffer-memory=33554432#失敗重試發送的次數spring.kafka.producer.retries=2#------消費者配置文件---------#指定kafka消息體和key的編碼格式spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#指定消費者組的group_idspring.kafka.consumer.group-id=kafka_test#kafka意外宕機時,再次開啟消息消費的策略,共有三種策略#earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費#latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據、#none:當所有分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常spring.kafka.consumer.auto-offset-reset=earliest#自動提交offsetspring.kafka.consumer.enable-auto-commit=true#提交offset時間間隔spring.kafka.consumer.auto-commit-interval=100#消費監聽接口監聽的主題不存在時,默認會報錯因此要關掉這個spring.kafka.listener.missing-topics-fatal=false

2.創建topic

import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * 使用代碼創建的topic * 三個參數意思:topic的名稱;分區數量,新主題的復制因子;如果指定了副本分配,則為-1。 */@Configurationpublic class KafkaTopic {   @Bean  public NewTopic batchTopic() {    return new NewTopic("testTopic", 8, (short) 1);  }}
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 木兰县| 万安县| 明溪县| 化德县| 育儿| 隆化县| 介休市| 蒙山县| 芦溪县| 沙湾县| 交口县| 浑源县| 浮山县| 长子县| 禄劝| 惠州市| 射阳县| 正蓝旗| 灵台县| 白玉县| 恩平市| 霞浦县| 三河市| 益阳市| 化隆| 新泰市| 眉山市| 奎屯市| 嵊泗县| 朔州市| 乐东| 陆良县| 资源县| 那坡县| 福清市| 三明市| 怀宁县| 常熟市| 桂平市| 南江县| 莎车县|