国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 開發 > Java > 正文

spring boot 與kafka集成的示例代碼

2024-07-14 08:40:18
字體:
來源:轉載
供稿:網友

新建spring boot項目

這里使用intellij IDEA

spring,boot,集成,kafka

spring,boot,集成,kafka

spring,boot,集成,kafka

spring,boot,集成,kafka

添加kafka集成maven

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>com.example</groupId>  <artifactId>demo</artifactId>  <version>0.0.1-SNAPSHOT</version>  <packaging>jar</packaging>  <name>demo</name>  <description>Demo project for Spring Boot</description>  <parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>1.5.8.RELEASE</version>    <relativePath/> <!-- lookup parent from repository -->  </parent>  <properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>    <java.version>1.8</java.version>  </properties>  <dependencies>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-test</artifactId>      <scope>test</scope>    </dependency>  </dependencies>  <build>    <plugins>      <plugin>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-maven-plugin</artifactId>      </plugin>    </plugins>  </build></project>

項目中application.properties 添加

spring.kafka.bootstrap-servers=vm208:9092,vm:9092,vm50:9092spring.kafka.consumer.auto-offset-reset=latestspring.kafka.consumer.group-id=local_testspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.acks=1

新建KafkaConsumer消費類

package com.example.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class KafkaConsumer {  private Logger logger = LoggerFactory.getLogger(this.getClass());  @KafkaListener(topics = {"test"})  public void listen(ConsumerRecord<?, ?> record) {    System.out.printf("offset = %d,key =%s,value=%s/n", record.offset(), record.key(), record.value());  }}

啟動spring-boot程序,在kafka集群,模擬發送topic,檢驗接收

 

復制代碼 代碼如下:
bin/kafka-console-producer.sh --broker-list    vm208:9092,vm210:9092,vm50:9092  --topic  test

 

編寫producer代碼

package com.example.demo.producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class KafkaProducer {  @Autowired  private KafkaTemplate kafkaTemplate;  String topic="test";  public void sendMessage(String key,String data){    kafkaTemplate.send(new ProducerRecord(topic,key,data));  }}

建立一個restful模擬發送( //http://localhost:8080/kafka/send.do?key=2&data=allen-test-message)

package com.example.demo.controller;import com.example.demo.producer.KafkaProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class ProducerController {  @Autowired  private KafkaProducer kafkaProducer;  @RequestMapping(value = "/kafka/send.do", method = RequestMethod.GET)  public String sendMessage(@RequestParam(value = "key") String key, @RequestParam(value = "data") String data) {    kafkaProducer.sendMessage(key, data);    return "sucess";  }}

可以發現 spring-kafka大大減少了代碼工作量.

官方文檔: https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 高碑店市| 万源市| 贵州省| 旬邑县| 星子县| 桦南县| 启东市| 上饶县| 长宁区| 乳山市| 马关县| 五原县| 丁青县| 潮州市| 江北区| 大方县| 万宁市| 涪陵区| 同江市| 阆中市| 铁力市| 宜兰市| 江油市| 海晏县| 浦江县| 栾城县| 阳春市| 启东市| 大洼县| 大英县| 黄石市| 成都市| 博白县| 北票市| 基隆市| 潼南县| 渭源县| 奎屯市| 老河口市| 河间市| 东安县|