国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

Python實現(xiàn)數(shù)據(jù)庫并行讀取和寫入實例

2020-02-16 01:40:41
字體:
供稿:網(wǎng)友

這篇主要記錄一下如何實現(xiàn)對數(shù)據(jù)庫的并行運算來節(jié)省代碼運行時間。語言是Python,其他語言思路一樣。

前言

一共23w條數(shù)據(jù),是之前通過自然語言分析處理過的數(shù)據(jù),附一張截圖:


要實現(xiàn)對news主體的讀取,并且找到其中含有的股票名稱,只要發(fā)現(xiàn),就將這支股票和對應(yīng)的日期、score寫入數(shù)據(jù)庫。

顯然,幾十萬條數(shù)據(jù)要是一條條讀寫,然后在本機上操作,耗時太久,可行性極低。所以,如何有效并行的讀取內(nèi)容,并且進行操作,最后再寫入數(shù)據(jù)庫呢?

并行讀取和寫入

并行讀取:創(chuàng)建N*max_process個進程,對數(shù)據(jù)庫進行讀取。讀取的時候應(yīng)該注意:

    每個進程需要分配不同的connection和對應(yīng)的cursor,否則數(shù)據(jù)庫會報錯。 數(shù)據(jù)庫必須能承受相應(yīng)的高并發(fā)訪問(可以手動更改)

實現(xiàn)的時候,如果不在進程里面創(chuàng)建新的connection,就會發(fā)生沖突,每個進程拿到權(quán)限后,會被下個進程釋放,所以匯報出來NoneType Error的錯誤。

    并行寫入:在對數(shù)據(jù)庫進行更改的時候,不可以多進程更改。所以,我們需要根據(jù)已有的表,創(chuàng)建max_process-1個同樣結(jié)構(gòu)的表用來寫入。表的命名規(guī)則可以直接在原來基礎(chǔ)上加上1,2,3...數(shù)字可以通過對max_process取余得到。

此時,對應(yīng)進程里面先后出現(xiàn)讀入的conn(保存消息后關(guān)閉)和寫入的conn。每個進程對應(yīng)的表的index就是 主循環(huán)中的num對max_process取余(100->4,101->5),這樣每個進程只對一個表進行操作了。

部分代碼實現(xiàn)

max_process = 16 #最大進程數(shù)def read_SQL_write(r_host,r_port,r_user,r_passwd,r_db,r_charset,w_host,w_port,w_user,w_passwd,w_db,w_charset,cmd,index=None):  #得到tem字典保存著信息  try:    conn = pymysql.Connect(host=r_host, port=r_port, user=r_user, passwd =r_passwd, db =r_db, charset =r_charset)    cursor = conn.cursor()    cursor.execute(cmd)  except Exception as e:    error = "[-][-]%d fail to connect SQL for reading" % index    log_error('error.log',error)    return   else:    tem = cursor.fetchone()    print('[+][+]%d succeed to connect SQL for reading' % index)  finally:    cursor.close()    conn.close()    try:    conn = pymysql.Connect(host=w_host, port=w_port, user=w_user, passwd =w_passwd, db =w_db, charset =w_charset)    cursor = conn.cursor()    cursor.execute(cmd)  except Exception as e:    error = "[-][-]%d fail to connect SQL for writing" % index    log_error('error.log',error)    return   else:    print('[+][+]%d succeed to connect SQL for writing' % index)      r_dict = dict()  r_dict['id'] = tem[0]  r_dict['content_id'] = tem[1]  r_dict['pub_date'] = tem[2]  r_dict['title'] = cht_to_chs(tem[3])  r_dict['title_score'] =tem[4]![](http://images2015.cnblogs.com/blog/1172464/201706/1172464-20170609000900309-1810357590.png)  r_dict['news_content'] = cht_to_chs(tem[5])  r_dict['content_score'] = tem[6]    for key in stock_dict.keys():    #能找到對應(yīng)的股票    if stock_dict[key][1] and ( r_dict['title'].find(stock_dict[key][1])!=-1 or r_dict['news_content'].find(stock_dict[key][1])!=-1 ):      w_dict=dict()      w_dict['code'] = key      w_dict['english_name'] = stock_dict[key][0]      w_dict['cn_name'] = stock_dict[key][1]      #得到分數(shù)      if r_dict['title_score']:        w_dict['score']=r_dict['title_score']      else:        w_dict['score']=r_dict['content_score']            #開始寫入      try:        global max_process        cmd = "INSERT INTO dyx_stock_score%d VALUES ('%s', '%s' , %d , '%s' , '%s' , %.2f );" % /          (index%max_process ,r_dict['content_id'] ,r_dict['pub_date'] ,w_dict['code'] ,w_dict['english_name'] ,w_dict['cn_name'] ,w_dict['score'])        cursor.execute(cmd)        conn.commit()      except Exception as e:        error = "  [-]%d fail to write to SQL" % index        cursor.rollback()        log_error('error.log',error)      else:        print("  [+]%d succeed to write to SQL" % index)  cursor.close()  conn.close()def main():  num = 238143#數(shù)據(jù)庫查詢拿到的總數(shù)  p = None  for index in range(1,num+1):    if index%max_process==1:      if p:        p.close()        p.join()      p = multiprocessing.Pool(max_process)    r_cmd = ('select id,content_id,pub_date,title,title_score,news_content,content_score from dyx_emotion_analysis where id = %d;' % (index))    p.apply_async(func = read_SQL_write,args=(r_host,r_port,r_user,r_passwd,r_db,r_charset,w_host,w_port,w_user,w_passwd,w_db,w_charset,r_cmd,index,))  if p:    p.close()    p.join()            
發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 丘北县| 绍兴市| 大足县| 雷山县| 乐平市| 惠来县| 莆田市| 太康县| 吉隆县| 盐池县| 中西区| 榆树市| 仪陇县| 巨鹿县| 小金县| 沙坪坝区| 江源县| 井研县| 岱山县| 诏安县| 社会| 龙山县| 拜泉县| 保定市| 洮南市| 临颍县| 昌都县| 博野县| 九寨沟县| 波密县| 五指山市| 金沙县| 江山市| 南皮县| 三穗县| 南投县| 灵山县| 区。| 忻州市| 忻州市| 尚志市|