国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

跟天齊老師學Spark(8)--Spark RDD綜合練習

2019-11-11 04:25:08
字體:
來源:轉載
供稿:網友
綜合練習:通過基站信息計算家庭地址和工作地址需求:根據手機信號來計算其所在的位置手機一開機,就會和附近的基站建立連接,建立連接和斷開連接都會被記錄到服務器上的日志,所以即使沒手機有開啟網絡或者GPS,也可以定位手機所在的位置。基站都有一定的輻射范圍,并且根據信號強度有不同的信號級別,比如2G、3G和4G信號。我們雖然不知道手機用戶所在的具體位置,但是我們知道基站的位置,手機用戶一旦進入基站的輻射范圍,手機就會和基站之間建立連接。我們就可以計算用戶大致的位置。我們就可以根據這些位置信息做一些推薦廣告。比如附近的商家,你可能喜歡的商品或者服務。假如現在我們得到了一些位置數據,比如有手機號、建立連接的標記(比如1)、斷開連接的標記(比如0)、建立連接的時間戳、斷開連接的時間戳等字段。用斷開連接的時間減去建立連接的時間就是用戶在該基站下停留的時間。但是這種計算方式不是很好,因為在實際中用戶可能會停留好幾天的情況,或者說有建立連接但是沒有斷開連接的情況。所以這里面其實還會有一個會話的概念。其實基站不是一直保持連接的,它可能每隔一段時間他會自動斷開一次。比如每隔一天就斷開一次。每個基站都有一個基站ID,這是一個UUID。所以可能還會一個和基站相關的基站表,比如基站的id和經緯度等信息。我們應該將兩個表進行join才能得到用戶在基站下停留的時間等信息。這里我們先不考慮會話id的概念。我們這里只是求某個用戶白天和晚上等某個時間段停留時間的從高到低進行排序。比如早晨8點到晚上6點之間停留時間最長的我們可以認為是用戶的工作地點。相反,在晚上6點到第二天早上8點這段時間中停留時間最長的我們就認為是用戶的住所。知道了用戶的工作地點和住處,我們就可以做一些推薦了。但是還有一個問題是,一個用戶可能在一天中會經過幾十甚至上百個基站。我們怎么才能知道它在哪個基站下面停留的時間最長呢?而且還有一個問題,一個用戶在同一個基站下路過還不止一次。比如某用戶,在他公司和家之間有一個基站,他早上上班時路過某基站一次,中午回家又路過一次,晚上下班又路過一次。這樣,他就會在同一個基站中路過很多次。這樣在基站的服務器日志中就會記錄很多條數據。我們現在要計算用戶在哪個基站下停留時間最長,其實就是簡單的數據切分,然后進行求和,然后進行join。為了便于理解,我們模擬了一些簡單的日志數據,共4個字段:手機號碼,時間戳,基站id,連接類型(1表示建立連接,0表示斷開連接):基站A:
    18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1    18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1    18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0    18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0基站B:
    18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1    18688888888,20160327075100,9F36407EAD0629FC166F14DDE7970F68,1    18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0    18688888888,20160327081300,9F36407EAD0629FC166F14DDE7970F68,0    18688888888,20160327175000,9F36407EAD0629FC166F14DDE7970F68,1    18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1    18688888888,20160327220000,9F36407EAD0629FC166F14DDE7970F68,0    18611132889,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0基站C:
    18611132889,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1    18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1    18688888888,20160327081900,CC0710CC94ECC657A8561DE549D940E0,0    18611132889,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0    18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1    18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0    18611132889,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1    18611132889,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0    下面是基站表的數據,共4個字段,分別代表基站id和經緯度以及信號的輻射類型(比如2G信號、3G信號和4G信號):
    9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6    CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6    16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6    基于以上3個基站的日志數據,要求計算某個手機號碼在一天之內出現最多的兩個地點。因為一個手機號碼可能一天當中可能會經過很多的基站,可能他在家停留了10個小時,在公司停留了8個小時,還有可能坐車的時候路過了一些基站。思路:    求每個手機號碼在哪些基站下面停留的時間最長,在計算的時候,用"手機號碼+基站"才能定位在哪個基站下面停留的時間,    因為每個基站下面會有很多的用戶的日志數據。全國有很多的基站,每個電信分公司只負責計算自己的數據。數據存放在基站下面的機房的服務器上。一般是用過一些工具通過網絡把這些數據搜集過來。搜集過來的數據量可能會很大,這些數據一般會存放到分布式的文件系統中,比如存放到HDFS中。我們可能會基于一周或者一個月的數據量來計算,時間跨度越大,計算出來的結構就越精確。相關資料在"Spark資料"中。重要:寫好的spark程序,如果我不想每次都提交到spark集群上面運行,可以在程序中指定"在本地運行模式",也就是如下方式:new SparkConf().setAppName("xxxx").setMaster("local")它表示要在本地模擬一個程序來運行,它并沒有把它提交到集群。但是,這種方式在linux和Mac系統中沒有問題,而在Windows下會有異常。因為我們的spark程序要從hdfs中讀數據,所以它要用到hadoop的InputFormat來讀數據,如果要在windows下面進行本地調試,需要做一些事情。我們知道hadoop要壓縮和解壓縮,那么壓縮和解壓縮所需要的都是一些c或c++編寫的庫,而這些c或c++編寫的庫文件是不跨平臺的。所以要在windows下面調試就必須先把這些庫安裝好。我們建議在linux下面進行調試,如果你沒有Mac系統的話,可以在linux虛擬機上安裝一個idea開發工具?使用Linux的圖形界面來調試。下面是完整的代碼:
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object MobileLocation {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("MobileLocation").setMaster("local[2]")    val sc = new SparkContext(conf)    val lines: RDD[String] = sc.textFile(args(0))    //切分    //lines.map(_.split(",")).map(arr => (arr(0), arr(1).toLong, arr(2), args(3)))    val splited = lines.map(line => {      val fields = line.split(",")      val mobile = fields(0)      val lac = fields(2)      val tp = fields(3)      val time = if(tp == "1") -fields(1).toLong else fields(1).toLong      //拼接數據      ((mobile, lac), time)    })    //分組聚合    val reduced : RDD[((String, String), Long)] = splited.reduceByKey(_+_)    val lmt = reduced.map(x => {      //(基站,(手機號, 時間))      (x._1._2, (x._1._1, x._2))    })    //連接    val lacInfo: RDD[String] = sc.textFile(args(1))    //整理基站數據    val splitedLacInfo = lacInfo.map(line => {      val fields = line.split(",")      val id = fields(0)      val x = fields(1)      val y = fields(2)      (id, (x, y))    })    //連接jion    val joined: RDD[(String, ((String, Long), (String, String)))] = lmt.join(splitedLacInfo)    PRintln(joined.collect().toBuffer)    sc.stop()  }}
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 阳春市| 潜山县| 元谋县| 江川县| 盐城市| 西华县| 德兴市| 西畴县| 德格县| 镇赉县| 安丘市| 临沭县| 化州市| 尤溪县| 平陆县| 湘潭市| 蓝山县| 蒙城县| 新巴尔虎右旗| 清远市| 嘉义市| 民乐县| 西平县| 本溪市| 泉州市| 天峻县| 芦溪县| 汝州市| 土默特左旗| 巢湖市| 县级市| 郓城县| 衢州市| 榆中县| 芜湖市| 高密市| 西和县| 印江| 鄱阳县| 深泽县| 保山市|