国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > PHP > 正文

Kafka的介紹以及基于PHP的kafka的安裝和測試

2020-03-22 18:55:41
字體:
供稿:網(wǎng)友
本篇文章給大家分享的內(nèi)容是關于Kafka的介紹以及基于PHP的kafka的安裝和測試,內(nèi)容很詳細,有需要的朋友可以參考一下,希望可以幫助到你們。

簡介

Kafka 是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)

kafka角色必知

producer:生產(chǎn)者。
consumer:消費者。
topic: 消息以topic為類別記錄,Kafka將消息種子(Feed)分類, 每一類的消息稱之為一個主題(Topic)。
broker:以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic), 并從Broker拉數(shù)據(jù),從而消費這些已發(fā)布的消息。

經(jīng)典模型

1. 一個主題下的分區(qū)不能小于消費者數(shù)量,即一個主題下消費者數(shù)量不能大于分區(qū)屬,大了就浪費了空閑了
2. 一個主題下的一個分區(qū)可以同時被不同消費組其中某一個消費者消費
3. 一個主題下的一個分區(qū)只能被同一個消費組的一個消費者消費

3315151442-5b5832bb37224_articlex.png

常用參數(shù)說明request.required.acks

Kafka producer的ack有3中機制,初始化producer時的producerconfig可以通過配置request.required.acks不同的值來實現(xiàn)。

0:這意味著生產(chǎn)者producer不等待來自broker同步完成的確認繼續(xù)發(fā)送下一條(批)消息。此選項提供最低的延遲但最弱的耐久性保證(當服務器發(fā)生故障時某些數(shù)據(jù)會丟失,如leader已死,但producer并不知情,發(fā)出去的信息broker就收不到)。

1:這意味著producer在leader已成功收到的數(shù)據(jù)并得到確認后發(fā)送下一條message。此選項提供了更好的耐久性為客戶等待服務器確認請求成功(被寫入死亡leader但尚未復制將失去了唯一的消息)。

-1:這意味著producer在follower副本確認接收到數(shù)據(jù)后才算一次發(fā)送完成。
此選項提供最好的耐久性,我們保證沒有信息將丟失,只要至少一個同步副本保持存活。

三種機制,性能依次遞減 (producer吞吐量降低),數(shù)據(jù)健壯性則依次遞增。

auto.offset.reset

1. earliest:自動將偏移重置為最早的偏移量
2. latest:自動將偏移量重置為最新的偏移量(默認)
3. none:如果consumer group沒有發(fā)現(xiàn)先前的偏移量,則向consumer拋出異常。
4. 其他的參數(shù):向consumer拋出異常(無效參數(shù))

kafka安裝和簡單測試安裝kafka(不需要安裝,解包即可)
# 官方下載地址:http://kafka.apache.org/downloads# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgztar -xzf kafka_2.12-1.1.1.tgzcd kafka_2.12-1.1.0
啟動kafka server
# 需先啟動zookeeperbin/zookeeper-server-start.sh config/zookeeper.propertiesbin/kafka-server-start.sh config/server.properties
啟動kafka客戶端測試
# 創(chuàng)建一個話題,test話題2個分區(qū)bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic testCreated topic "test".# 顯示所有話題bin/kafka-topics.sh --list --zookeeper localhost:2181test# 顯示話題信息bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic:test    PartitionCount:2    ReplicationFactor:1    Configs:    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0# 啟動一個生產(chǎn)者(輸入消息)bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test[等待輸入自己的內(nèi)容 出現(xiàn)>輸入即可]>i am a new msg !>i am a good msg ?# 啟動一個生產(chǎn)者(等待消息) # 注意這里的--from-beginning,每次都會從頭開始讀取,你可以嘗試去掉和不去掉看下效果bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning[等待消息]i am a new msg !i am a good msg ?
安裝kafka的php擴展
git clone https://github.com/arnaud-lb/php-rdkafka.gitcd php-rdkafkaphpize./configuremake all -j 5sudo make installvim [php]/php.iniextension=rdkafka.so
php代碼實踐生產(chǎn)者
<?php$conf = new RdKafka/Conf();$conf->setDrMsgCb(function ($kafka, $message) {    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);});$conf->setErrorCb(function ($kafka, $err, $reason) {    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);});$rk = new RdKafka/Producer($conf);$rk->setLogLevel(LOG_DEBUG);$rk->addBrokers("127.0.0.1");$cf = new RdKafka/TopicConf();$cf->set('request.required.acks', 0);$topic = $rk->newTopic("test", $cf);$option = 'qkl';for ($i = 0; $i < 20; $i++) {    //RD_KAFKA_PARTITION_UA自動選擇分區(qū)    //$option可選    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);}$len = $rk->getOutQLen();while ($len > 0) {    $len = $rk->getOutQLen();    var_dump($len);    $rk->poll(50);}
運行生產(chǎn)者
php producer.php# outputint(20)int(20)int(20)int(20)int(0)# 你可以查看你剛才上面啟動的消費者shell應該會輸出消息qkl . 0qkl . 1qkl . 2qkl . 3qkl . 4qkl . 5qkl . 6qkl . 7qkl . 8qkl . 9qkl . 10qkl . 11qkl . 12qkl . 13qkl . 14qkl . 15qkl . 16qkl . 17qkl . 18qkl . 19
消費者
<?php$conf = new RdKafka/Conf();$conf->setDrMsgCb(function ($kafka, $message) {    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);});$conf->setErrorCb(function ($kafka, $err, $reason) {    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);});//設置消費組$conf->set('group.id', 'myConsumerGroup');$rk = new RdKafka/Consumer($conf);$rk->addBrokers("127.0.0.1");$topicConf = new RdKafka/TopicConf();$topicConf->set('request.required.acks', 1);//在interval.ms的時間內(nèi)自動提交確認、建議不要啟動//$topicConf->set('auto.commit.enable', 1);$topicConf->set('auto.commit.enable', 0);$topicConf->set('auto.commit.interval.ms', 100);// 設置offset的存儲為file//$topicConf->set('offset.store.method', 'file');// 設置offset的存儲為broker $topicConf->set('offset.store.method', 'broker');//$topicConf->set('offset.store.path', __DIR__);//smallest:簡單理解為從頭開始消費,其實等價于上面的 earliest//largest:簡單理解為從最新的開始消費,其實等價于上面的 latest//$topicConf->set('auto.offset.reset', 'smallest');$topic = $rk->newTopic("test", $topicConf);// 參數(shù)1消費分區(qū)0// RD_KAFKA_OFFSET_BEGINNING 重頭開始消費// RD_KAFKA_OFFSET_STORED 最后一條消費的offset記錄開始消費// RD_KAFKA_OFFSET_END 最后一條消費$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); ////$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);while (true) {    //參數(shù)1表示消費分區(qū),這里是分區(qū)0    //參數(shù)2表示同步阻塞多久    $message = $topic->consume(0, 12 * 1000);    switch ($message->err) {        case RD_KAFKA_RESP_ERR_NO_ERROR:            var_dump($message);            break;        case RD_KAFKA_RESP_ERR__PARTITION_EOF:            echo "No more messages; will wait for more/n";            break;        case RD_KAFKA_RESP_ERR__TIMED_OUT:            echo "Timed out/n";            break;        default:            throw new /Exception($message->errstr(), $message->err);            break;    }}
查看服務器元數(shù)據(jù)(topic/partition/broker)
<?php$conf = new RdKafka/Conf();$conf->setDrMsgCb(function ($kafka, $message) {    file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);});$conf->setErrorCb(function ($kafka, $err, $reason) {    printf("Kafka error: %s (reason: %s)/n", rd_kafka_err2str($err), $reason);});$conf->set('group.id', 'myConsumerGroup');$rk = new RdKafka/Consumer($conf);$rk->addBrokers("127.0.0.1");$allInfo = $rk->metadata(true, NULL, 60e3);$topics = $allInfo->getTopics();echo rd_kafka_offset_tail(100);echo "--";echo count($topics);echo "--";foreach ($topics as $topic) {    $topicName = $topic->getTopic();    if ($topicName == "__consumer_offsets") {        continue ;    }    $partitions = $topic->getPartitions();    foreach ($partitions as $partition) {//        $rf = new ReflectionClass(get_html' target='_blank'>class($partition));//        foreach ($rf->getMethods() as $f) {//            var_dump($f);//        }//        die();        $topPartition = new RdKafka/TopicPartition($topicName, $partition->getId());        echo  "當前的話題:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";        echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;    }}

相關推薦:

kafka安裝及Kafka-PHP擴展的使用,kafkakafka-php擴展

kafka裝配及Kafka-PHP擴展的使用

以上就是Kafka的介紹以及基于PHP的kafka的安裝和測試的詳細內(nèi)容,更多請關注 其它相關文章!

鄭重聲明:本文版權歸原作者所有,轉(zhuǎn)載文章僅為傳播更多信息之目的,如作者信息標記有誤,請第一時間聯(lián)系我們修改或刪除,多謝。

發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 年辖:市辖区| 舟曲县| 旅游| 裕民县| 滨州市| 德令哈市| 常熟市| 达日县| 浏阳市| 台东市| 孝感市| 龙江县| 且末县| 镇坪县| 湖州市| 长葛市| 台州市| 茶陵县| 黄梅县| 沂水县| 双辽市| 宁强县| 兖州市| 南平市| 灵台县| 金溪县| 陇川县| 泸西县| 玉屏| 永济市| 同江市| 望江县| 绵竹市| 宁波市| 浮山县| 祥云县| 萨嘎县| 新巴尔虎左旗| 无极县| 蒲江县| 静宁县|