国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python消費kafka數據批量插入到es的方法

2020-02-16 00:20:50
字體:
來源:轉載
供稿:網友

1、es的批量插入

這是為了方便后期配置的更改,把配置信息放在logging.conf中

用elasticsearch來實現批量操作,先安裝依賴包,sudo pip install Elasticsearch2

from elasticsearch import Elasticsearch class ImportEsData:  logging.config.fileConfig("logging.conf")  logger = logging.getLogger("msg")  def __init__(self,hosts,index,type):    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)    self.index = index    self.type = type  def set_date(self,data):     # 批量處理     # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})    self.es.index(index=self.index,doc_type=self.index,body=data)

2、使用pykafka消費kafka

1.因為kafka是0.8,pykafka不支持zk,只能用get_simple_consumer來實現

2.為了實現多個應用同時消費而且不重消費,所以一個應用消費一個partition

3. 為是確保消費數據量在不滿足10000這個批量值,能在一個時間范圍內插入到es中,這里設置consumer_timeout_ms一個超時等待時間,退出等待消費阻塞。

4.退出等待消費阻塞后導致無法再消費數據,因此在獲取self.consumer 的外層加入了while True 一個死循環

#!/usr/bin/python# -*- coding: UTF-8 -*-from pykafka import KafkaClientimport loggingimport logging.configfrom ConfigUtil import ConfigUtilimport datetimeclass KafkaPython:  logging.config.fileConfig("logging.conf")  logger = logging.getLogger("msg")  logger_data = logging.getLogger("data")  def __init__(self):    self.server = ConfigUtil().get("kafka","kafka_server")    self.topic = ConfigUtil().get("kafka","topic")    self.group = ConfigUtil().get("kafka","group")    self.partition_id = int(ConfigUtil().get("kafka","partition"))    self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))    self.consumer = None    self.hosts = ConfigUtil().get("es","hosts")    self.index_name = ConfigUtil().get("es","index_name")    self.type_name = ConfigUtil().get("es","type_name")  def getConnect(self):    client = KafkaClient(self.server)    topic = client.topics[self.topic]    p = topic.partitions    ps={p.get(self.partition_id)}    self.consumer = topic.get_simple_consumer(      consumer_group=self.group,      auto_commit_enable=True,      consumer_timeout_ms=self.consumer_timeout_ms,      # num_consumer_fetchers=1,      # consumer_id='test1',      partitions=ps      )    self.starttime = datetime.datetime.now()  def beginConsumer(self):    print("beginConsumer kafka-python")    imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)    #創建ACTIONS     count = 0    ACTIONS = []     while True:      endtime = datetime.datetime.now()      print (endtime - self.starttime).seconds      for message in self.consumer:        if message is not None:          try:            count = count + 1            # print(str(message.partition.id)+","+str(message.offset)+","+str(count))            # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))            action = {               "_index": self.index_name,               "_type": self.type_name,               "_source": message.value            }            ACTIONS.append(action)            if len(ACTIONS) >= 10000:              imprtEsData.set_date(ACTIONS)              ACTIONS = []              self.consumer.commit_offsets()              endtime = datetime.datetime.now()              print (endtime - self.starttime).seconds              #break          except (Exception) as e:            # self.consumer.commit_offsets()            print(e)            self.logger.error(e)            self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"/n")            # self.logger_data.error(message.value+"/n")          # self.consumer.commit_offsets()      if len(ACTIONS) > 0:        self.logger.info("等待時間超過,consumer_timeout_ms,把集合數據插入es")        imprtEsData.set_date(ACTIONS)        ACTIONS = []        self.consumer.commit_offsets()  def disConnect(self):    self.consumer.close()from elasticsearch import Elasticsearch from elasticsearch.helpers import bulkclass ImportEsData:  logging.config.fileConfig("logging.conf")  logger = logging.getLogger("msg")  def __init__(self,hosts,index,type):    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)    self.index = index    self.type = type  def set_date(self,data):     # 批量處理     success = bulk(self.es, data, index=self.index, raise_on_error=True)     self.logger.info(success)             
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 达日县| 林州市| 玛曲县| 包头市| 天门市| 康平县| 鹤岗市| 岳阳县| 塘沽区| 那坡县| 彝良县| 凤翔县| 阿图什市| 铜川市| 麻城市| 永嘉县| 贡山| 天等县| 五原县| 赤城县| 永春县| 玉树县| 晋州市| 治多县| 桂平市| 晋江市| 斗六市| 黄大仙区| 包头市| 云南省| 兰考县| 增城市| 吉安市| 三河市| 囊谦县| 木里| 依安县| 广德县| 江永县| 安图县| 大丰市|