国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python 解決動態的定義變量名,并給其賦值的方法(大數據處理)

2020-02-15 23:37:15
字體:
來源:轉載
供稿:網友

最近消費kafka數據到磁盤的時候遇到了這樣的問題:

需求:每天大概有1千萬條數據,每條數據包含19個字段信息,需要將數據寫到服務器磁盤,以第二個字段作為大類建立目錄,第7個字段作為小類配合時間戳作為文件名,臨時文件后綴tmp,當每個文件的寫入條數(可配置,比如100條)達到要求條數時,將后綴tmp改為out。

問題:大類共有30個,小類不計其數而且未知,比如大類為A,小類為a,時間戳為20180606095835234,則A目錄下的文件名為20180606095835234_a.tmp,這樣一來需要在此文件寫滿100條時,更新時間戳生成第二個文件名,如果此時有1000個文件都在寫則需要有1000個時間戳,和1000個計數器記錄每個文件當前的條數,如果分別定義1000個變量顯然是不劃算的,

嘗試:中間過程想到了動態定義變量名,即

定義第七個字段:seven = data.split('|')[7]

定義文件名:filename = time_stamp + '_' + seven+'.tmp',

定義文件計數器:seven + ‘_num' = 0

定義文件時間戳:seven + '_stamp' = time.time( )

想法其實是沒問題的,但是這里用到了一個不常用的語法:用一個變量名和一個字符串拼接出來一個新的變量名,并繼續賦值(不知道我的表述是否清楚),試過了用local()函數、global()函數、exec()函數都沒有達到預期效果,也許是把問題想的太復雜了

解決:最后使用三個字典將這個問題完美解決,

定義一個字典用來存計數器,字典的每一個鍵對應一個文件名,值對應當前計數,并實時更新;

定義一個字典用來存時間戳,鍵對應一個文件名,值對應時間戳,達到100條就更新一次;

定義一個字典用來存大類,鍵對應代號,值對應分類;

局部功能代碼如下:

def kafka_to_disk(): print('啟動前檢測上次運行時是否存在意外中斷的數據文件......') print('搜索最近一次執行腳本產生的時間目錄......') # 待處理臨時文件列表 tmp_list = [] try:  for category_dir in os.listdir(local_file_path):   if len(os.listdir(local_file_path+os.sep+category_dir)) > 0:    for file in os.listdir(local_file_path+os.sep+category_dir):     if suffix in file:      tmp_list.append(local_file_path+os.sep+category_dir+os.sep+file)  # print('上次運行程序產生的臨時文件有---{}'.format(tmp_list)) except Exception as e:  pass if len(tmp_list) == 0:  print('未掃描任何殘留臨時文件') else:  print('開始修復殘留臨時文件......') tmp_num = 0 for tmp in tmp_list:  os.rename(tmp, tmp.split('.')[0]+'.out')  tmp_num += 1 print('本次啟動共修復殘留臨時文件★★★★★-----{}個-----★★★★★'.format(tmp_num))  category_poor = {  '1': 'news', '2': 'weibo', '3': 'weixin', '4': 'app', '5': 'newspaper', '6': 'luntan',  '7': 'blog', '8': 'video', '9': 'shangji', '10': 'shangjia', '11': 'gtzy', '12': 'zfztb',  '13': 'gyfp', '14': 'gjz', '15': 'zfxx', '16': 'ptztb', '17': 'company', '18': 'house',  '19': 'hospital', '20': 'bank', '21': 'zone', '22': 'express', '23': 'zpgw', '24': 'zscq',  '25': 'hotel', '26': 'cpws', '27': 'gxqy', '28': 'gpjj', '29': 'dtyy', '30': 'bdbk'}  time_stamp = utils.get_time_stamp() # 初始化毫秒級時間戳 : 20180509103015125 consumer = KafkaConsumer(topic, group_id=group_id, auto_offset_reset=auto_offset_reset, bootstrap_servers=eval(bootstrap_servers)) print('連接kafka成功,數據篩選中......') file_poor = {}       # 子類池用于文件計數器 time_stamp_poor = {}     # 子類時間戳池,用于觸發文件切換 time_stamp = utils.get_time_stamp()  # 初始化毫秒級時間戳 :20180509103015125 for message in consumer:  # 提取第8個字段自動匹配目錄進行創建  if message.value.decode().split('|')[1] in category_poor:   category = category_poor[message.value.decode().split('|')[1]]  else:   print(message.value.decode())   continue  category_dir = local_file_path + os.sep + category  if not os.path.exists(category_dir):   os.makedirs(category_dir)  # 提取第2個字段,用于生成文件名  if message.value.decode().split('|')[7] in time_stamp_poor:   shot_file_name = time_stamp_poor[message.value.decode().split('|')[7]] + '_' + message.value.decode().split('|')[7]  else:   shot_file_name = time_stamp + '_' + message.value.decode().split('|')[7]  file_name = category_dir + os.sep + shot_file_name + '.tmp'   # 給每一個文件設定一個計數器  if message.value.decode().split('|')[7] not in file_poor:   file_poor[message.value.decode().split('|')[7]] = 0   with open(file_name, 'a', encoding='utf-8')as f1:   f1.write(message.value.decode())   file_poor[message.value.decode().split('|')[7]] += 1   # 觸發切換文件的操作,用時間戳生成第二文件名  if file_poor[message.value.decode().split('|')[7]] == strip_number:   time_stamp_poor[message.value.decode().split('|')[7]] = utils.get_time_stamp()   file_poor[message.value.decode().split('|')[7]] = 0            
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 洪江市| 岢岚县| 上林县| 八宿县| 含山县| 伊宁市| 阜康市| 射阳县| 巴林右旗| 栾城县| 邹城市| 乳源| 西乌珠穆沁旗| 柳林县| 新干县| 龙江县| 屏东市| 嘉峪关市| 白银市| 安徽省| 会昌县| 乳源| 井陉县| 黄浦区| 柳州市| 兴义市| 建平县| 肇源县| 京山县| 亚东县| 包头市| 灵寿县| 上蔡县| 洞口县| 上犹县| 达孜县| 平远县| 西贡区| 西乌珠穆沁旗| 庄浪县| 松江区|