国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

Python計算注冊用戶當日發生行為示例

2019-11-08 01:55:11
字體:
來源:轉載
供稿:網友
思路:根據觀察新注冊用戶隔三天持續七天的用戶是否留存,查看用戶注冊當日在彈幕發送、手機綁定等方面的用戶行信息。1、用到的目標Hive及MySQL
-- 按間隔3天持續7天的方式計算drop table if exists bi_register_clustering_user_detail;create table bi_register_clustering_user_detail(appsource string,appkey string,uid string,remain_flag string) partitioned by (pt_day string);-- 日志行為表建表語句drop table if exists bi_clustering_behavior_daily_state;create table bi_clustering_behavior_daily_state(appsource string,appkey string,uid string,remain_flag string,behavior_type string,behavior_flag string) partitioned by (pt_day string);-- Mysql行轉列的表show create table `bi_clustering_behavior_daily_state`;CREATE TABLE `bi_clustering_behavior_daily_state` (  `appsource` varchar(8) DEFAULT NULL,  `appkey` varchar(18) DEFAULT NULL,  `uid` varchar(18) DEFAULT NULL,  `remain_flag` int(11) DEFAULT NULL,  `pt_day` varchar(10) DEFAULT NULL,  `MessageSend` int(11) DEFAULT NULL,  `PhoneBinding` int(11) DEFAULT NULL,  `RoomSubscribe` int(11) DEFAULT NULL,  `Gifts` int(11) DEFAULT NULL,  `Pay` int(11) DEFAULT NULL,  `VideoPlayaccess` int(11) DEFAULT NULL,  `GameZoneAccess` int(11) DEFAULT NULL,  `LivePRogramStatus` int(11) DEFAULT NULL,  `WatchLongerThan5Minutes` int(11) DEFAULT NULL,  `etl_time` datetime DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;2、日期處理代碼/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/DateProc.py
# -*- coding=utf-8 -*-import warningsimport datetimewarnings.filterwarnings("ignore")def getNowDay():    DayNow = datetime.datetime.today().strftime('%Y-%m-%d')    return DayNowdef getFristDay():    FristDay=datetime.datetime.strptime('2017-01-01', '%Y-%m-%d').strftime('%Y-%m-%d')    return FristDaydef dateRange(beginDate, endDate):    dates = []    dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")    date = beginDate[:]    while date <= endDate:        dates.append(date)        dt = dt + datetime.timedelta(1)        date = dt.strftime("%Y-%m-%d")    return datesdef weekRang(beginDate, endDate):    week = set()    for date in dateRange(beginDate, endDate):        week.add(datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])).isocalendar()[0:2])    wk_l = []    for wl in sorted(list(week)):        wk_l.append(str(wl[0])+'#'+str(wl[1]))    return wk_ldef getWeekFristday(weekflag):    yearnum = weekflag[0:4]  # 取到年份    weeknum = weekflag[5:7]  # 取到周    stryearstart = yearnum + '0101'  # 當年第一天    yearstart = datetime.datetime.strptime(stryearstart, '%Y%m%d')  # 格式化為日期格式    yearstartcalendarmsg = yearstart.isocalendar()  # 當年第一天的周信息    yearstartweekday = yearstartcalendarmsg[2]    yearstartyear = yearstartcalendarmsg[0]    if yearstartyear < int(yearnum):        daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 1) * 7    else:        daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 2) * 7    week1day = (yearstart + datetime.timedelta(days=daydelat)).date()    return week1daydef getWeekLastday(weekflag):    week7day = getWeekFristday(weekflag) + datetime.timedelta(days=6)    return week7daydef getInterval3Last7(run_day):    Interval3Last7_start_day = datetime.datetime.strptime(run_day, '%Y-%m-%d') + datetime.timedelta(days=4)    Interval3Last7_end_day = datetime.datetime.strptime(run_day, '%Y-%m-%d') + datetime.timedelta(days=10)    return run_day,Interval3Last7_start_day.strftime('%Y-%m-%d'),Interval3Last7_end_day.strftime('%Y-%m-%d')# Batch Test# print dateRange(getFristDay(), getNowDay())# print weekRang(getFristDay(), getNowDay())# for weekFrist in weekRang(getFristDay(), getNowDay()):#     for weekSecond in weekRang(getFristDay(), getNowDay()):#         if (weekFrist[0:4] == weekSecond[0:4] and int(weekFrist[5:]) + 1 == int(weekSecond[5:])): #暫未考慮跨年#             if getWeekLastday(weekflag=weekFrist).strftime('%Y-%m-%d') < getNowDay() and getWeekLastday(weekflag=weekSecond).strftime('%Y-%m-%d') < getNowDay():#                 print weekFrist, weekSecond#                 print getWeekFristday(weekflag=weekFrist), getWeekLastday(weekflag=weekFrist), getWeekFristday(weekflag=weekSecond), getWeekLastday(weekflag=weekSecond)# 間隔日期時間的獲取# run_day='2017-02-03'# print getInterval3Last7(run_day)# for run_day in dateRange(getFristDay(), (datetime.datetime.strptime(getNowDay(), '%Y-%m-%d') - datetime.timedelta(days=11)).strftime('%Y-%m-%d')):#     print getInterval3Last7(run_day)#     run_day = getInterval3Last7(run_day)[0]#     Interval3Last7_start_day = getInterval3Last7(run_day)[1]#     Interval3Last7_end_day = getInterval3Last7(run_day)[2]#     print run_day,Interval3Last7_start_day,Interval3Last7_end_day3、是否留存用戶數據分群處理業務處理:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/Hive_userClustering_detail_proc.py
# -*- coding=utf-8 -*-from DateProc import *import oswarnings.filterwarnings("ignore")def register_clustering_user_detail_Interval3Last7(run_day, Interval3Last7_start_day, Interval3Last7_end_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; /            create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; /            alter table bi_register_clustering_user_detail drop if exists partition (pt_day='%s'); /            alter table bi_register_clustering_user_detail add partition (pt_day='%s'); /            with tab_user_register_info as( /            select uid,appsource,appkey,pt_day from bi_all_register_info /            where pt_day = '%s' /            ), /            tab_access_log_Interval3Last7 as ( /            select RadixChange(lower(uid),16,10) uid from bi_all_access_log /            where pt_day >= '%s' and pt_day <= '%s' /            group by RadixChange(lower(uid),16,10)) /            insert overwrite table bi_register_clustering_user_detail partition(pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,case when a2.uid is not null then 1 else 0 end remain_flag /            from tab_user_register_info a1 /            left join tab_access_log_Interval3Last7 a2 on a1.uid=a2.uid /            ;" /            """ % (run_day, run_day, run_day, Interval3Last7_start_day, Interval3Last7_end_day, run_day))# Batch Test并行調度:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/BatchThread_clusteringUserDetail.py
# -*- coding=utf-8 -*-import threadpoolimport timefrom Hive_userClustering_detail_proc import *warnings.filterwarnings("ignore")today = datetime.date.today()yesterday = today - datetime.timedelta(days=1)tomorrow = today + datetime.timedelta(days=1)now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "當前時間是:",now_time# 間隔3天持續7天數據計算batch_runDay_list = []for run_day in dateRange(getFristDay(), (datetime.datetime.strptime(getNowDay(), '%Y-%m-%d') - datetime.timedelta(days=11)).strftime('%Y-%m-%d')):    run_day = getInterval3Last7(run_day)[0]    Interval3Last7_start_day = getInterval3Last7(run_day)[1]    Interval3Last7_end_day = getInterval3Last7(run_day)[2]    batch_runDay_list.append(([run_day,Interval3Last7_start_day,Interval3Last7_end_day], None))requests = []request_register_remain_user = threadpool.makeRequests(register_clustering_user_detail_Interval3Last7, batch_runDay_list)requests.extend(request_register_remain_user)main_pool = threadpool.ThreadPool(6)[main_pool.putRequest(req) for req in requests]if __name__ == '__main__':    while True:        try:            time.sleep(3)            main_pool.poll()        except KeyboardInterrupt:            print("**** Interrupted!")            break        except threadpool.NoResultsPending:            break    if main_pool.dismissedWorkers:        print("Joining all dismissed worker threads...")        main_pool.joinAllDismissedWorkers()now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "當前時間是:",now_time4、用戶注冊當日用戶行為處理業務處理:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/Hive_ClusteringBehavior_proc.py
# -*- coding=utf-8 -*-import osimport refrom DateProc import *warnings.filterwarnings("ignore")def behaviorDailyTab_partitionAdd(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            alter table bi_clustering_behavior_daily_state drop if exists partition (pt_day='%s'); /            alter table bi_clustering_behavior_daily_state add partition (pt_day='%s'); /            " /            """ % (run_day, run_day))def behavior_MessageSend(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'MessageSend' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_message_send where pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_PhoneBinding(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'PhoneBinding' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_user_phone_num where state=0 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_RoomSubscribe(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'RoomSubscribe' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_room_subscriber where state=0 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_Pay(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'Pay' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_pay_info where state=0 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_Gifts(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'Gifts' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select fromuid uid from bi_all_jellyfish_log where logtype=1 and pt_day='%s' group by fromuid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_GameZoneAccess(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'GameZoneAccess' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from bi_all_jellyfish_log where logtype=5 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_VideoPlayAccess(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'VideoPlayAccess' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from bi_all_jellyfish_log where logtype=6 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_WatchLongerThan5Minutes(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'WatchLongerThan5Minutes' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_daily_heartbeat_stat_v2 where pt_day='%s' group by uid having count(*)>=5) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_LiveProgramStatus(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'LiveProgramStatus' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select x2.uid from (select room_id from data_chushou_live_history_status where pt_day='%s' group by room_id) x1 inner join (select id room_id,creator_uid uid from data_chushou_live_uid_status where state=0) x2 on x1.room_id=x2.room_id group by x2.uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_batchCtl(run_day):    behaviorDailyTab_partitionAdd(run_day)  # 分區的添加    behavior_MessageSend(run_day)  #用戶行為數據的插入    behavior_PhoneBinding(run_day)    behavior_RoomSubscribe(run_day)    behavior_Pay(run_day)    behavior_Gifts(run_day)    behavior_GameZoneAccess(run_day)    behavior_VideoPlayAccess(run_day)    behavior_WatchLongerThan5Minutes(run_day)    behavior_LiveProgramStatus(run_day)def getBehaviorDayList():    dayMinMax_data = os.popen ("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            select min(pt_day),max(pt_day) from bi_register_clustering_user_detail;" /            """).readlines();    DayMM_list = []    for dm_list in dayMinMax_data:        dm = re.split('/t', dm_list.replace('/n', ''))        DayMM_list.append(dm)    for daym in DayMM_list:        beginDate = daym[0]        endDate = daym[1]    return dateRange(beginDate, endDate)# Batch Test# run_day = '2017-01-02'# behavior_batchCtl(run_day)# for run_day in getBehaviorDayList():#     print run_day并行調度:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/BatchThread_Behavior.py
# -*- coding=utf-8 -*-import threadpoolimport timefrom Hive_ClusteringBehavior_proc import *warnings.filterwarnings("ignore")today = datetime.date.today()yesterday = today - datetime.timedelta(days=1)tomorrow = today + datetime.timedelta(days=1)now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "當前時間是:",now_time# 計算2017年的數據信息batch_runDay_list = getBehaviorDayList()requests = []request_behavior_batchCtl = threadpool.makeRequests(behavior_batchCtl, batch_runDay_list)requests.extend(request_behavior_batchCtl)main_pool = threadpool.ThreadPool(10)[main_pool.putRequest(req) for req in requests]if __name__ == '__main__':    while True:        try:            time.sleep(30)            main_pool.poll()        except KeyboardInterrupt:            print("**** Interrupted!")            break        except threadpool.NoResultsPending:            break    if main_pool.dismissedWorkers:        print("Joining all dismissed worker threads...")        main_pool.joinAllDismissedWorkers()now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "當前時間是:",now_time5、用戶行為結果表進行行列轉換并插入到Mysql/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/Hive_ClusteringBehavior_line2col.py
# -*- coding=utf-8 -*-import osimport reimport timefrom DateProc import *warnings.filterwarnings("ignore")def behaviorLine2Col_intoMysql():    os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; /                truncate table bi_clustering_behavior_daily_state; /                " """ )    behaviorData = os.popen("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            select appsource,appkey,uid,remain_flag,pt_day, /            max(case when behavior_type='MessageSend' then behavior_flag else null end) MessageSend, /            max(case when behavior_type='PhoneBinding' then behavior_flag else null end) PhoneBinding, /            max(case when behavior_type='RoomSubscribe' then behavior_flag else null end) RoomSubscribe, /            max(case when behavior_type='Gifts' then behavior_flag else null end) Gifts, /            max(case when behavior_type='Pay' then behavior_flag else null end) Pay, /            max(case when behavior_type='VideoPlayAccess' then behavior_flag else null end) VideoPlayAccess, /            max(case when behavior_type='GameZoneAccess' then behavior_flag else null end) GameZoneAccess, /            max(case when behavior_type='LiveProgramStatus' then behavior_flag else null end) LiveProgramStatus, /            max(case when behavior_type='WatchLongerThan5Minutes' then behavior_flag else null end) WatchLongerThan5Minutes /            from bi_clustering_behavior_daily_state /            where pt_day is not null /            group by appsource,appkey,uid,remain_flag,pt_day /            ;" /            """ ).readlines();    behavior_list = []    for bhL in behaviorData:        bh = re.split('/t', bhL.replace('/n', ''))        behavior_list.append(bh)    i = 0    insert_mysql_sql = """/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; /                insert into bi_clustering_behavior_daily_state(appsource, appkey, uid, remain_flag, pt_day, MessageSend, PhoneBinding, RoomSubscribe, Gifts, Pay, VideoPlayAccess, GameZoneAccess, LiveProgramStatus, WatchLongerThan5Minutes, etl_time) /                values """    for behavior_line in behavior_list:        appsource = behavior_line[0]        appkey = behavior_line[1]        uid = behavior_line[2]        remain_flag = behavior_line[3]        pt_day = behavior_line[4]        MessageSend = behavior_line[5]        PhoneBinding = behavior_line[6]        RoomSubscribe = behavior_line[7]        Gifts = behavior_line[8]        Pay = behavior_line[9]        VideoPlayAccess = behavior_line[10]        GameZoneAccess = behavior_line[11]        LiveProgramStatus = behavior_line[12]        WatchLongerThan5Minutes = behavior_line[13]        etl_time = time.strftime('%Y-%m-%d %X', time.localtime())        i += 1        insert_mysql_sql = insert_mysql_sql + """('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s'),""" % (appsource, appkey, uid, remain_flag, pt_day, MessageSend, PhoneBinding, RoomSubscribe, Gifts, Pay, VideoPlayAccess, GameZoneAccess, LiveProgramStatus, WatchLongerThan5Minutes, etl_time)        if (i % 1000 == 0):            insert_mysql_sql = insert_mysql_sql.rstr注意Mysql數據批量插入的方法使用!
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 乃东县| 乳山市| 迁安市| 贵南县| 澄江县| 托克逊县| 金沙县| 长沙县| 油尖旺区| 津市市| 大埔区| 华阴市| 五台县| 丰台区| 青浦区| 新邵县| 宁强县| 宽城| 抚顺市| 达日县| 马鞍山市| 黄梅县| 体育| 余江县| 石河子市| 金阳县| 专栏| 涿鹿县| 奉贤区| 于都县| 井冈山市| 共和县| 遵义县| 许昌市| 开江县| 边坝县| 刚察县| 栾城县| 濮阳县| 绵竹市| 安泽县|