国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

用python簡單實現mysql數據同步到ElasticSearch的教程

2020-02-15 21:34:26
字體:
來源:轉載
供稿:網友

之前博客有用logstash-input-jdbc同步mysql數據到ElasticSearch,但是由于同步時間最少是一分鐘一次,無法滿足線上業務,所以只能自己實現一個,但是時間比較緊,所以簡單實現一個

思路:

網上有很多思路用什么mysql的binlog功能什么的,但是我對mysql了解實在有限,所以用一個很呆板的辦法查詢mysql得到數據,再插入es,因為數據量不大,而且10秒間隔同步一次,效率還可以,為了避免服務器之間的時間差和mysql更新和查詢產生的時間差,所以在查詢更新時間條件時是和上一次同步開始時間比較,這樣不管數據多少,更新耗時多少都不會少數據,因為原則是同步不漏掉任何數據,也可以程序多開將時間差和間隔時間差異化,因為用mysql中一個id當作es中的id,也避免了重復數據

使用:

只需要按照escongif.py寫配置文件,然后寫sql文件,最后直接執行mstes.py就可以了,我這個也是參考logstash-input-jdbc的配置形式

MsToEs

|----esconfig.py(配置文件)

|----mstes.py(同步程序)

|----sql_manage.py(數據庫管理)

|----aa.sql(需要用到sql文件)

|----bb.sql(需要用到sql文件)

sql_manage.py:

# -*-coding:utf-8 -*-__author__ = "ZJL"from sqlalchemy.pool import QueuePoolfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker, scoped_sessionimport tracebackimport esconfig# 用于不需要回滾和提交的操作def find(func): def wrapper(self, *args, **kwargs):  try:   return func(self, *args, **kwargs)  except Exception as e:   print(traceback.format_exc())   print(str(e))   return traceback.format_exc()  finally:   self.session.close() return wrapperclass MysqlManager(object): def __init__(self):  mysql_connection_string = esconfig.mysql.get("mysql_connection_string")  self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,         pool_recycle=3600)  # self.DB_Session = sessionmaker(bind=self.engine)  # self.session = self.DB_Session()  self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)  self.db = scoped_session(self.DB_Session)  self.session = self.db() @find def select_all_dict(self, sql, keys):  a = self.session.execute(sql)  a = a.fetchall()  lists = []  for i in a:   if len(keys) == len(i):    data_dict = {}    for k, v in zip(keys, i):     data_dict[k] = v    lists.append(data_dict)   else:    return False  return lists # 關閉 def close(self):  self.session.close()

aa.sql:

select  CONVERT(c.`id`,CHAR)    as id,  c.`code`   as code,  c.`project_name` as project_name,  c.`name`   as name,  date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')  as update_time, from `cc` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';             
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 韶山市| 彭水| 永新县| 资兴市| 和田县| 武功县| 西平县| 扶沟县| 广东省| 榆林市| 斗六市| 蚌埠市| 黄梅县| 大竹县| 漳浦县| 确山县| 泰宁县| 安阳市| 顺昌县| 房产| 沙湾县| 武宁县| 新源县| 泾川县| 资兴市| 犍为县| 九台市| 湖南省| 汕尾市| 德昌县| 杭锦后旗| 鲜城| 延川县| 永城市| 偏关县| 清新县| 历史| 宁河县| 盐边县| 曲阳县| 兰州市|