注:僅為筆記
1、Python ftp連接,下載文件
def ftpDownload() : #創(chuàng)建ftp對(duì)象實(shí)例 ftp = FTP() ftp.connect(FTPip, FTPPORT) #通過賬號(hào)和密碼登錄FTP服務(wù)器 ftp.login(USERNAME,USERPWD) #如果參數(shù) pasv 為真,打開被動(dòng)模式傳輸 (PASV MODE) , #否則,如果參數(shù) pasv 為假則關(guān)閉被動(dòng)傳輸模式。 #在被動(dòng)模式打開的情況下,數(shù)據(jù)的傳送由客戶機(jī)啟動(dòng),而不是由服務(wù)器開始。 #這里要根據(jù)不同的服務(wù)器配置# ftp.set_pasv(0) #在FTP連接中切換當(dāng)前目錄 # CURRTPATH= "/home1/ftPRoot/ybmftp/testupg/payment"# ftp.cwd(CURRTPATH) #為準(zhǔn)備下載到本地的文件,創(chuàng)建文件對(duì)象 f = open(DownLocalFilename, 'wb') #從FTP服務(wù)器下載文件到前一步創(chuàng)建的文件對(duì)象,其中寫對(duì)象為f.write,1024是緩沖區(qū)大小 ftp.retrbinary('RETR ' + DownRoteFilename , f.write , 1024) #關(guān)閉下載到本地的文件 #提醒:雖然Python可以自動(dòng)關(guān)閉文件,但實(shí)踐證明,如果想下載完后立即讀該文件,最好關(guān)閉后重新打開一次 f.close() #關(guān)閉FTP客戶端連接 ftp.close()2、日期字符串生成,拼裝文件名稱
def getYesterday() : now = datetime.datetime.now() date = now + datetime.timedelta(days = -1) return date.strftime('%Y%m%d')3、csv文件讀取
file_zip = zipfile.ZipFile(DownRoteFilename,'r') for file in file_zip.namelist(): file_zip.extract(file,r'.') with codecs.open(file,'rb','utf-8') as csvfile: # with open(file,'rb') as csvfile: spamreader = csv.reader(csvfile) line_num = 0 L1 = [] for row in spamreader: line_num=line_num+1; if line_num ==7 : #print ', '.join(row) #存放字段前先清空 #SQL_FILEDS =[] for s in row: print s.decode('utf-8') SQL_FILEDS.append(s) if line_num >7 and len(row) > 1 : #print ', '.join(row) L1.append(row)# print L1 #達(dá)到批量處理行數(shù)之后批量入庫 if len(L1) >=BATCH_LINE : MySQL_database(L1) L1 =[] #測(cè)試時(shí)候只讀取幾行 #if line_num >10 : # break print line_num #循環(huán)讀取數(shù)據(jù)結(jié)束,處理剩余未達(dá)到批量處理的數(shù)組對(duì)象 if len(L1) >0 : mysql_database(L1) L1 =[] #刪除解壓后文件 os.remove(file)注意中文編碼問題,4、mysql數(shù)據(jù)插入
def mysql_database(L1): conn = MySQLdb.connect(host='localhost', user='root',passwd='***',charset="utf8") cursor = conn.cursor() conn.select_db('mask_data') #for row in L1 : # print row #print SQL_FILEDS sql_parm='' for s in SQL_FILEDS : sql_parm +='%s,' sql = 'insert into test1 VALUES ('+sql_parm[:-1]+') ' print sql print len(L1) cursor.executemany(sql,L1) #test_all_count = int(cursor.rowcount) #test_all = cursor.fetchall() conn.commit() cursor.close() conn.close() 需要留意中文編碼,conn = MySQLdb.connect(host='localhost', user='root',passwd='***',charset="utf8")如果不指定編碼會(huì)默認(rèn)編碼,導(dǎo)致中文亂碼。完成腳本:
# -*- coding: utf-8 -*-#!/usr/bin/env python##Author: Liu6import zipfileimport csvimport codecs import MySQLdbimport datetimeimport osfrom ftplib import FTPimport codecs #import pandas as pdimport sysreload(sys)sys.setdefaultencoding('utf-8')def getYesterday() : now = datetime.datetime.now() date = now + datetime.timedelta(days = -1) return date.strftime('%Y%m%d')dateStr = getYesterday()print dateStr#測(cè)試使用的字符串dateStr = '20170102'DownRoteFilename='provincePayDay_Zz-'+dateStr+'080300.zip'DownLocalFilename = DownRoteFilenameSQL_FILEDS = []#設(shè)置批量處理數(shù)據(jù)數(shù)量BATCH_LINE = 10000#指定IP地址和端口,賬號(hào)密碼信息FTPIP= "127.0.0.1"FTPPORT= 21USERNAME= "test"USERPWD= "test"encoding='utf-8'def ftpDownload() : #創(chuàng)建ftp對(duì)象實(shí)例 ftp = FTP() ftp.connect(FTPIP, FTPPORT) #通過賬號(hào)和密碼登錄FTP服務(wù)器 ftp.login(USERNAME,USERPWD) #如果參數(shù) pasv 為真,打開被動(dòng)模式傳輸 (PASV MODE) , #否則,如果參數(shù) pasv 為假則關(guān)閉被動(dòng)傳輸模式。 #在被動(dòng)模式打開的情況下,數(shù)據(jù)的傳送由客戶機(jī)啟動(dòng),而不是由服務(wù)器開始。 #這里要根據(jù)不同的服務(wù)器配置# ftp.set_pasv(0) #在FTP連接中切換當(dāng)前目錄 # CURRTPATH= "/home1/ftproot/ybmftp/testupg/payment"# ftp.cwd(CURRTPATH) #為準(zhǔn)備下載到本地的文件,創(chuàng)建文件對(duì)象 f = open(DownLocalFilename, 'wb') #從FTP服務(wù)器下載文件到前一步創(chuàng)建的文件對(duì)象,其中寫對(duì)象為f.write,1024是緩沖區(qū)大小 ftp.retrbinary('RETR ' + DownRoteFilename , f.write , 1024) #關(guān)閉下載到本地的文件 #提醒:雖然Python可以自動(dòng)關(guān)閉文件,但實(shí)踐證明,如果想下載完后立即讀該文件,最好關(guān)閉后重新打開一次 f.close() #關(guān)閉FTP客戶端連接 ftp.close()def mysql_database(L1): conn = MySQLdb.connect(host='localhost', user='root',passwd='***',charset="utf8") cursor = conn.cursor() conn.select_db('mask_data') #for row in L1 : # print row #print SQL_FILEDS sql_parm='' for s in SQL_FILEDS : sql_parm +='%s,' sql = 'insert into test1 VALUES ('+sql_parm[:-1]+') ' print sql print len(L1) cursor.executemany(sql,L1) #test_all_count = int(cursor.rowcount) #test_all = cursor.fetchall() conn.commit() cursor.close() conn.close()def readCSVFile() : file_zip = zipfile.ZipFile(DownRoteFilename,'r') for file in file_zip.namelist(): file_zip.extract(file,r'.') with codecs.open(file,'rb','utf-8') as csvfile: # with open(file,'rb') as csvfile: spamreader = csv.reader(csvfile) line_num = 0 L1 = [] for row in spamreader: line_num=line_num+1; if line_num ==7 : #print ', '.join(row) #存放字段前先清空 #SQL_FILEDS =[] for s in row: print s.decode('utf-8') SQL_FILEDS.append(s) if line_num >7 and len(row) > 1 : #print ', '.join(row) L1.append(row)# print L1 #達(dá)到批量處理行數(shù)之后批量入庫 if len(L1) >=BATCH_LINE : mysql_database(L1) L1 =[] #測(cè)試時(shí)候只讀取幾行 #if line_num >10 : # break print line_num #循環(huán)讀取數(shù)據(jù)結(jié)束,處理剩余未達(dá)到批量處理的數(shù)組對(duì)象 if len(L1) >0 : mysql_database(L1) L1 =[] #刪除解壓后文件 os.remove(file)ftpDownload()readCSVFile()#刪除ftp上下載的文件#os.remove(DownLocalFilename)
新聞熱點(diǎn)
疑難解答
圖片精選