国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發(fā)設計 > 正文

Storm入門學習隨記

2019-11-14 10:33:39
字體:
供稿:網(wǎng)友

推薦慕課網(wǎng)視頻:http://www.imooc.com/video/10055

 

====Storm的起源。

Storm是開源的、分布式、流式計算系統(tǒng)

 

什么是分布式呢?就是將一個任務拆解給多個計算機去執(zhí)行,讓許多機器共通完成同一個任務,

把這個多機的細節(jié)給屏蔽,對外提供同一個接口、同一個服務,這樣的系統(tǒng)就是分布式系統(tǒng)。

 

在多年以前并沒有非常范用的分布式系統(tǒng),即使存在,也都是限定在指定的領域,

當然,也有人嘗試從中提取出共通的部分,發(fā)明一個通用的分布式系統(tǒng),但是都沒有很好的結(jié)果。

后來,Google發(fā)表了3篇論文,提出了分布式計算的模型,在分布式系統(tǒng)上有了一個質(zhì)的突破。

 

有一位大牛看了這3篇論文之后,深受啟發(fā),然后就發(fā)明了Hadoop系統(tǒng)。

 

然后,基于Hadoop的改造系統(tǒng)就如雨后春筍一般,接二連三的出現(xiàn)了。

以至于,Hadoop已經(jīng)不是一套軟件,而是一整套生態(tài)系統(tǒng)了。

于是,人們談到分布式,就必談Hadoop了。

 

但是,Hadoop并不是萬能的,它只能處理適合進行批量計算的需求。對于,非批量的計算就不能夠滿足要求了。

很多時候,我們只能先收集一段時間數(shù)據(jù),等數(shù)據(jù)收集到一定規(guī)模之后,我們才開始MaPReduce處理。

 

有這么一個故事:

-------------------

路人甲是在一家媒體公司A工作,他的主要工作內(nèi)容很簡單,就是在一些搜索引擎上做廣告,

眾所周知,搜索引擎上的廣告是競價排名的,誰土豪誰就排前面,出錢少的就只能排在后面。

公司A的競爭對手都比較土豪,所以呢,公司A的廣告就一直排在后面,也沒什么好的辦法。

后來,路人甲想出了一個餿主意,就是用程序不斷的去點擊競爭對手的廣告,讓對手的廣告費

很快的花費調(diào),這樣公司A就可以廉價的將廣告排在前面了。

搜索引擎公司試圖識別出這些惡意點擊屏來保護商家,將這些惡意點擊扣除的費用返還給商家。

一般來說呢,如果利用MapReduce,一般情況下,都需要收集一段時間數(shù)據(jù),然后根據(jù)這些

數(shù)據(jù)來算出哪些點擊是惡意的,本身收集數(shù)據(jù)就已經(jīng)很耗費時間了,再等計算完畢之后,

土豪商家的廣告費也基本上不剩什么了。

所以呢,我們希望在點擊發(fā)生的時候就算出來該點擊是否是作弊行為,及時不能馬上判斷出,

也應該盡早的計算出來。

-------------------

 為了解決上面這個故事的需求,分布式流式計算系統(tǒng)就產(chǎn)生了,比較知名的有:

?【Yahoo】S4

?【IBM】StreamBase

?【Amazon】Kinesis

?【Spark】Streaming

?【Google】Millwheel

?【Apache】Storm(目前業(yè)界中最知名、流程)

 

批量計算(以Hadoop為代表)與流式計算的區(qū)別有哪些呢?

 

###################

目前已經(jīng)有人在做一些前瞻性的項目,這些人試圖將批量計算和流式計算進行整合

試圖使用同一套API,即搞定流式計算,又搞定批量計算。

使一段代碼不要任何改動,就可以同時執(zhí)行在批量計算和流式計算兩種系統(tǒng)之上。

這種系統(tǒng)目前比較有名的有:

【Twitter】Summing Bird

【Google】CloudDataflow

兩個接口都已經(jīng)開源了。等以后有機會一定要提前接觸一下。

###################

 

====Storm組件

Storm采用的是主從結(jié)構,就是使用一臺主節(jié)點來管理整個集群的運行狀態(tài)。

這個主節(jié)點被稱為:Nimbus,從節(jié)點用來維護每臺機器的狀態(tài),被稱為:Supervisor

 

為什么采取主從結(jié)構呢?主從結(jié)構比較簡單,不需要進行主節(jié)點仲裁等工作。

 

從前面的結(jié)構圖中我們還可以看出,采取主從結(jié)構之后,Nimbus是一個單點,

但是,我們知道分布式領域里,大家都比較討厭自己的系統(tǒng)設計中存在單點,

因為單點如果發(fā)生故障,很有可能影響到整個集群的可用性。

 

所以,如果一個系統(tǒng)設計中如果存在單點,一般情況下這個單點的作業(yè)必然比較輕,

掛了之后,短時間之內(nèi)也不影響真?zhèn)€系統(tǒng)的運行,并且一般情況下都是沒有狀態(tài)的,

宕機之后至需要重啟就能夠恢復并正確處理。

 

Nimbus的角色是只負責一些管理性的工作,它并不關心Worker之間的數(shù)據(jù)是如何傳輸?shù)模?/p>

它的一些主要狀態(tài)都存在分布式協(xié)調(diào)服務(Zookeeper)中,內(nèi)存里面的東西都是可以丟失的,

如果它掛掉,只要沒有運算節(jié)點發(fā)生故障,那么整個作業(yè)還是能夠正常的進行數(shù)據(jù)處理的。

Nimbus重啟之后,就可以正確處理真?zhèn)€系統(tǒng)的事務了。

 

Supervisor的角色是聽Nimbus的話,來啟動并監(jiān)控真正進行計算的Worker的進程,

如果Worker有異常,那么久幫助Worker重啟一下,它也不負責數(shù)據(jù)計算和數(shù)據(jù)傳輸,

 

真正的數(shù)據(jù)計算和輸出,都是由Worker來進行。

Worker是運行在工作節(jié)點上面,被Supervisor守護進程創(chuàng)建的用來干活的JVM進程。

每個Worker對應于一個給定topology的全部執(zhí)行任務的一個子集。

反過來說,一個Worker里面不會運行屬于不同的topology的執(zhí)行任務。

 

====Storm UI

為了方便用戶管理集群,查看集群運行狀態(tài),提供了一個基于Web的UI來監(jiān)控整個Storm集群

它本身不是集群運行的必須部分,它的啟動停止都不影響Storm的正常運行。

 

====Storm作業(yè)提交運行流程

(1)用戶使用Storm的API來編寫Storm Topology。

(2)使用Storm的Client將Topology提交給Nimbus。

Nimbus收到之后,會將把這些Topology分配給足夠的Supervisor。

(3)Supervisor收到這些Topoligy之后,Nimbus會指派一些Task給這些Supervisor。

(4)Nimvus會指示Supervisor為這些Task生成一些Worker。

(5)Worker來執(zhí)行這些Task來完成計算任務。

  

====StormAPI基礎概念

Storm稱用戶的一個作業(yè)為Topology(拓撲)。

 

為什么叫拓撲呢?是因為Storm的一個拓撲主要包含了許多的數(shù)據(jù)節(jié)點,還有一些計算節(jié)點,

以及這些節(jié)點之間的邊,也就是說Storm的拓撲是由這些點和邊組成的一個有向無環(huán)圖。

這些點有兩種:數(shù)據(jù)源節(jié)點(Spout)、普通的計算節(jié)點(Bolt),

點之間的邊稱為數(shù)據(jù)流(Stream),數(shù)據(jù)流中的每一條記錄稱為Tuple。

 

如下圖中,每一個“水龍頭”表示一個Spout,它會發(fā)送一些Tuple給下游的Bolt,

這些Bolt經(jīng)過處理周,再發(fā)送一個Tuple給下一個Bolt,

最后,在這些Bolt里面是可以執(zhí)行一些寫數(shù)據(jù)到外部存儲(如數(shù)據(jù)庫)等操作的。

在圖中這個Topology里面我們看到了兩個Spout和5個Bolt,

在實際運行的時候,每個Spout節(jié)點都可能有很多個實例,每個Bolt也有可能有很多個實例。

就像MapReduce一樣,一個Map節(jié)點并不代表只有一個并發(fā),而有可能很多個Map實例在跑。

 

這些Spout和Bolt的這些邊里面,用戶可以設置多種的Grouping的方式。

有些類似SQL中的Group By。用來制定這些計算是怎么分組的。

 

*Fields Grouping:保證同樣的字段移動落到同一個Bolt里。

 

--以WordCount為例,MapReduce和Storm的工作流程對比:

(1)MapReduce

(2)Storm

 

====各個組件的一些說明

--Topologies

為了在storm上面做實時計算, 你要去建立一些topologies。一個topology就是一個計算節(jié)點所組成的圖。Topology里面的每個處理節(jié)點都包含處理邏輯, 而節(jié)點之間的連接則表示數(shù)據(jù)流動的方向。運行一個Topology是很簡單的。首先,把你所有的代碼以及所依賴的jar打進一個jar包。然后運行類似下面的這個命令。

strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2

這個命令會運行主類: backtype.strom.MyTopology,參數(shù)是arg1, arg2。這個類的main函數(shù)定義這個topology并且把它提交給Nimbus。storm jar負責連接到nimbus并且上傳jar文件。

 

--Stream

Stream是storm里面的關鍵抽象。一個stream是一個沒有邊界的tuple序列。storm提供一些原語來分布式地、可靠地把一個stream傳輸進一個新的stream。比如: 你可以把一個tweets流傳輸?shù)綗衢T話題的流。storm提供的最基本的處理stream的原語是spout和bolt。你可以實現(xiàn)Spout和Bolt對應的接口以處理你的應用的邏輯。spout是流的源頭。比如一個spout可能從Kestrel隊列里面讀取消息并且把這些消息發(fā)射成一個流。又比如一個spout可以調(diào)用twitter的一個api并且把返回的tweets發(fā)射成一個流。通常Spout會從外部數(shù)據(jù)源(隊列、數(shù)據(jù)庫等)讀取數(shù)據(jù),然后封裝成Tuple形式,之后發(fā)送到Stream中。Spout是一個主動的角色,在接口內(nèi)部有個nextTuple函數(shù),Storm框架會不停的調(diào)用該函數(shù)。

 

bolt可以接收任意多個輸入stream, 作一些處理, 有些bolt可能還會發(fā)射一些新的stream。一些復雜的流轉(zhuǎn)換, 比如從一些tweet里面計算出熱門話題, 需要多個步驟, 從而也就需要多個bolt。Bolt可以做任何事情: 運行函數(shù),過濾tuple,做一些聚合,做一些合并以及訪問數(shù)據(jù)庫等等。Bolt處理輸入的Stream,并產(chǎn)生新的輸出Stream。Bolt可以執(zhí)行過濾、函數(shù)操作、Join、操作數(shù)據(jù)庫等任何操作。Bolt是一個被動的角色,其接口中有一個execute(Tuple input)方法,在接收到消息之后會調(diào)用此函數(shù),用戶可以在此方法中執(zhí)行自己的處理邏輯。

 

spout和bolt所組成一個網(wǎng)絡會被打包成topology, topology是storm里面最高一級的抽象(類似 Job), 你可以把topology提交給storm的集群來運行。topology的結(jié)構在Topology那一段已經(jīng)說過了,這里就不再贅述了。

 

topology里面的每一個節(jié)點都是并行運行的。 在你的topology里面, 你可以指定每個節(jié)點的并行度, storm則會在集群里面分配那么多線程來同時計算。一個topology會一直運行直到你顯式停止它。storm自動重新分配一些運行失敗的任務, 并且storm保證你不會有數(shù)據(jù)丟失, 即使在一些機器意外停機并且消息被丟掉的情況下。

 

--數(shù)據(jù)模型(Data Model)

storm使用tuple來作為它的數(shù)據(jù)模型。每個tuple是一堆值,每個值有一個名字,并且每個值可以是任何類型,在我的理解里面一個tuple可以看作一個沒有方法的java對象(或者是一個表的字段)。總體來看,storm支持所有的基本類型、字符串以及字節(jié)數(shù)組作為tuple的值類型。你也可以使用你自己定義的類型來作為值類型, 只要你實現(xiàn)對應的序列化器(serializer)。一個Tuple代表數(shù)據(jù)流中的一個基本的處理單元,例如一條cookie日志,它可以包含多個Field,每個Field表示一個屬性。

 

Tuple本來應該是一個Key-Value的Map,由于各個組件間傳遞的tuple的字段名稱已經(jīng)事先定義好了,所以Tuple只需要按序填入各個Value,所以就是一個Value List。一個沒有邊界的、源源不斷的、連續(xù)的Tuple序列就組成了Stream。

 

topology里面的每個節(jié)點必須定義它要發(fā)射的tuple的每個字段。比如下面這個bolt定義它所發(fā)射的tuple包含兩個字段,類型分別是: double和triple。public class DoubleAndTripleBoltimplementsIRichBolt {    private OutputCollectorBase _collector;    @Override    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {        _collector = collector;    }    @Override    public void execute(Tuple input) {        intval = input.getInteger(0);        _collector.emit(input,newValues(val*2, val*3));        _collector.ack(input);    }    @Override    public void cleanup() {    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(newFields("double","triple"));    }}

 

參考博客:http://blog.itpub.net/29754888/viewspace-1260026/

 

====StormAPI使用

我們來看看WorldCount的example代碼。

 

====Storm的并發(fā)機制

Task數(shù)量:表示每個Spout或Bolt邏輯上有多少個并發(fā)。它影響輸出結(jié)果。

Worker數(shù)量:代表總共有幾個JVM進程去執(zhí)行我們的作業(yè)。

Executor數(shù)量:表示每個Spout或Bolt啟動幾個線程來運行。

 

下面代碼中的數(shù)字表示Executor數(shù)量,它不影響結(jié)果,影響性能。

 

Worker的數(shù)量在Config中設置,下圖代碼中的部分表示W(wǎng)orker數(shù)量。

*本地模式中,Worker數(shù)不生效,只會啟動一個JVM進行來執(zhí)行作業(yè)。

*只有在集群模式設置Worker才有效。而且集群模式的時候一定要設置才能體現(xiàn)集群的價值。

 

====Storm數(shù)據(jù)可靠性

分布式系統(tǒng)都管理很多臺機器,需要保證任意的Worker掛掉之后,我們的系統(tǒng)仍然能正確的處理,那么

Storm如何保證這些數(shù)據(jù)正確的恢復?

Storm如何保證這些數(shù)據(jù)不被重復計算?

(1)Spout容錯API:NextTuple中,emit時,指定MsgID。

(2)Bolt容錯API:①emit時,錨定輸入Tuple。②Act輸入Tuple。

 

====Storm集群搭建

(1)安裝zookeeper集群

配置方法省略。

 

(2)下載安裝Storm

官網(wǎng)上下載Storm:http://storm.apache.org

上傳至linux并解壓縮。這里將Storm解壓縮到/opt/apache-storm-0.10.0路徑下了。

 

(3)修改Storm配置文件

配置文件路徑:/opt/apache-storm-0.9.5/conf/storm.yaml

配置內(nèi)容如下:

----------------

storm.zookeeper.servers:- "192.168.93.128"- "192.168.93.129"- "192.169.93.130"nimbus.host: "192.168.93.128"storm.local.dir: "/opt/apache-storm-0.9.5/status"supervisor.slots.ports:- 6700- 6701- 6702- 6703

----------------

 

置之后的文件如下如所示:

 

--Storm配置項詳細介紹

?storm.zookeeper.servers:

ZooKeeper服務器列表

?storm.zookeeper.port:

ZooKeeper連接端口

?storm.local.dir:

storm使用的本地文件系統(tǒng)目錄(必須存在并且storm進程可讀寫)

?storm.cluster.mode:

Storm集群運行模式([distributed|local])

?storm.local.mode.zmq:

Local模式下是否使用ZeroMQ作消息系統(tǒng),如果設置為false則使用java消息系統(tǒng)。默認為false

?storm.zookeeper.root:

ZooKeeper中Storm的根目錄位置

?storm.zookeeper.session.timeout:

客戶端連接ZooKeeper超時時間

?storm.id:

運行中拓撲的id,由storm name和一個唯一隨機數(shù)組成。

?nimbus.host:

nimbus服務器地址

?nimbus.thrift.port:nimbus的thrift監(jiān)聽端口

?nimbus.childopts:

通過storm-deploy項目部署時指定給nimbus進程的jvm選項

?nimbus.task.timeout.secs:

心跳超時時間,超時后nimbus會認為task死掉并重分配給另一個地址

?nimbus.monitor.freq.secs:

nimbus檢查心跳和重分配任務的時間間隔。注意如果是機器宕掉nimbus會立即接管并處理

?nimbus.supervisor.timeout.secs:

supervisor的心跳超時時間,一旦超過nimbus會認為該supervisor已死并停止為它分發(fā)新任務

?nimbus.task.launch.secs:

task啟動時的一個特殊超時設置。在啟動后第一次心跳前會使用該值來臨時替代nimbus.task.timeout.secs

?nimbus.reassign:

當發(fā)現(xiàn)task失敗時nimbus是否重新分配執(zhí)行。默認為真,不建議修改

?nimbus.file.copy.expiration.secs:

nimbus判斷上傳/下載鏈接的超時時間,當空閑時間超過該設定時nimbus會認為鏈接死掉并主動斷開

?ui.port:

Storm UI的服務端口

?drpc.servers:

DRPC服務器列表,以便DRPCSpout知道和誰通訊

?drpc.port:

Storm DRPC的服務端口

?supervisor.slots.ports:

supervisor上能夠運行workers的端口列表。每個worker占用一個端口,且每個端口只運行一個worker。

通過這項配置可以調(diào)整每臺機器上運行的worker數(shù)。(調(diào)整slot數(shù)/每機)

?supervisor.childopts:

在storm-deploy項目中使用,用來配置supervisor守護進程的jvm選項

?supervisor.worker.timeout.secs:

supervisor中的worker心跳超時時間,一旦超時supervisor會嘗試重啟worker進程.

?supervisor.worker.start.timeout.secs:

supervisor初始啟動時,worker的心跳超時時間,當超過該時間supervisor會嘗試重啟worker。

因為JVM初始啟動和配置會帶來的額外消耗,從而使得第一次心跳會超過supervisor.worker.timeout.secs的設定

?supervisor.enable:

supervisor是否應當運行分配給他的workers。默認為true,該選項用來進行Storm的單元測試,一般不應修改.

?supervisor.heartbeat.frequency.secs:

supervisor心跳發(fā)送頻率(多久發(fā)送一次)

?supervisor.monitor.frequency.secs:

supervisor檢查worker心跳的頻率

?worker.childopts:

supervisor啟動worker時使用的jvm選項。所有的”%ID%”字串會被替換為對應worker的標識符

?worker.heartbeat.frequency.secs:

worker的心跳發(fā)送時間間隔

?task.heartbeat.frequency.secs:

task匯報狀態(tài)心跳時間間隔

?task.refresh.poll.secs:

task與其他tasks之間鏈接同步的頻率。(如果task被重分配,其他tasks向它發(fā)送消息需要刷新連接)

。一般來講,重分配發(fā)生時其他tasks會理解得到通知。該配置僅僅為了防止未通知的情況。

?topology.debug:

如果設置成true,Storm將記錄發(fā)射的每條信息。

?topology.optimize:

master是否在合適時機通過在單個線程內(nèi)運行多個task以達到優(yōu)化topologies的目的

?topology.workers:

執(zhí)行該topology集群中應當啟動的進程數(shù)量。

每個進程內(nèi)部將以線程方式執(zhí)行一定數(shù)目的tasks。topology的組件結(jié)合該參數(shù)和并行度提示來優(yōu)化性能

?topology.ackers:

topology中啟動的acker任務數(shù)。

Acker保存由spout發(fā)送的tuples的記錄,并探測tuple何時被完全處理。

當Acker探測到tuple被處理完畢時會向spout發(fā)送確認信息。通常應當根據(jù)topology的吞吐量來確定acker的數(shù)目,但一般不需要太多。

當設置為0時,相當于禁用了消息可靠性。storm會在spout發(fā)送tuples后立即進行確認

?topology.message.timeout.secs:

topology中spout發(fā)送消息的最大處理超時時間。

如果一條消息在該時間窗口內(nèi)未被成功ack,Storm會告知spout這條消息失敗。而部分spout實現(xiàn)了失敗消息重播功能。

?topology.kryo.register:

注冊到Kryo(Storm底層的序列化框架)的序列化方案列表。序列化方案可以是一個類名,或者是com.esotericsoftware.kryo.Serializer的實現(xiàn)

?topology.skip.missing.kryo.registrations:

Storm是否應該跳過它不能識別的kryo序列化方案。如果設置為否task可能會裝載失敗或者在運行時拋出錯誤

?topology.max.task.parallelism:

在一個topology中能夠允許的最大組件并行度。該項配置主要用在本地模式中測試線程數(shù)限制.

?topology.max.spout.pending:

一個spout task中處于pending狀態(tài)的最大的tuples數(shù)量。該配置應用于單個task,而不是整個spouts或topology

?topology.state.synchronization.timeout.secs:

組件同步狀態(tài)源的最大超時時間(保留選項,暫未使用)

?topology.stats.sample.rate:

用來產(chǎn)生task統(tǒng)計信息的tuples抽樣百分比

?topology.fall.back.on.java.serialization:

topology中是否使用java的序列化方案

?zmq.threads:

每個worker進程內(nèi)zeromq通訊用到的線程數(shù)

?zmq.linger.millis:

當連接關閉時,鏈接嘗試重新發(fā)送消息到目標主機的持續(xù)時長。這是一個不常用的高級選項,基本上可以忽略.

?java.library.path:

JVM啟動(如Nimbus,Supervisor和workers)時的java.library.path設置。該選項告訴JVM在哪些路徑下定位本地庫

 

(4)配置Storm環(huán)境變量

環(huán)境變量位置:/etc/profile

配置內(nèi)容之后如下圖所示:

注意:環(huán)境變量修改只有,一定要使用Source命令來使之生效。

 

(5)啟動Storm

--啟動Storm UI

命令:storm ui >/dev/null 2>&1 &

我們可以它啟動的時候相關的輸出指向到/def/null,并且把錯誤也重新定向到正常輸出。

 

--啟動主節(jié)點(Nimbus節(jié)點)

命令:storm nimbus >/dev/null 2>&1 &

在第1臺Linux虛擬機上執(zhí)行。正常啟動時的jps結(jié)果如下圖所示:

 

--啟動工作節(jié)點(Supervisor節(jié)點)

命令:storm supervisor >/dev/null 2>&1 &

在第2、3臺Linux虛擬機上執(zhí)行。正常啟動時的jps結(jié)果如下圖所示:

 

(6)啟動StormUI監(jiān)控頁面:

Storm正常啟動之后,應該可以打開StormUI畫面。在瀏覽器中輸入地址和端口即可

正確啟動時應該如下圖所示:

--Mainpage:

main頁面主要包括3個部分

 

 【Cluster Summary】

?Nimbus uptime: nimbus的啟動時間

?Supervisors: storm集群中supervisor的數(shù)目

?used slots: 使用了的slots數(shù)

?free slots: 剩余的slots數(shù)

?total slots: 總的slots數(shù)

?Running tasks: 運行的任務數(shù)

 

【topology summary】

?Name: topology name

?id: topology id (由storm生成)

?status: topology的狀態(tài),包括(ACTIVE, INACTIVE, KILLED, REBALANCING)

?uptime: topology運行的時間

?num workers: 運行的workers數(shù)

?num tasks: 運行的task數(shù)

 

【supervisor summary】

?host: supervisor(主機)的主機名

?uptime: supervisor啟動的時間

?slots: supervisor的端口數(shù)

?used slots: 使用的端口數(shù)

 

--Topology page

topology頁面主要包括4個部分

【topology summary】

(同主頁)

 

【topology stats】

?window: 時間窗口,顯示10m、3h、1d和all time的運行狀況

?emitted: emitted tuple數(shù)

?transferred: transferred tuple數(shù), 說下與emitted的區(qū)別:如果一個task,emitted一個tuple到2個task中,則transferred tuple數(shù)是emitted tuple數(shù)的兩倍

?complete latency: spout emitting 一個tuple到spout ack這個tuple的平均時間

?acked: ack tuple數(shù)

?failed: 失敗的tuple數(shù)

 

【spouts】

?id: spout id

?parallelism: 任務數(shù)

?last error: 最近的錯誤數(shù),只顯示最近的前200個錯誤

?emitted、transferred、complete latency、acked和failed上面已解釋

 

【bolts】

?process latency: bolt收到一個tuple到bolt ack這個tuple的平均時間

其他參數(shù)都解釋過了

還有componentpage和taskpage,參數(shù)的解釋同上。

taskpage中的Component指的是spoutid或者boltid,time指的是錯誤發(fā)生的時間,error是指錯誤的具體內(nèi)容。

 

====Storm常用命令

【提交Topologies】

命令格式:storm jar 【jar路徑】 【拓撲包名.拓撲類名】 【拓撲名稱】

樣例:storm jar /storm-starter.jar storm.starter.WordCountTopology wordcountTop

#提交storm-starter.jar到遠程集群,并啟動wordcountTop拓撲。

 

【停止Topologies】

命令格式:storm kill 【拓撲名稱】

樣例:storm kill wordcountTop

#殺掉wordcountTop拓撲。

 

【啟動nimbus后臺程序】

命令格式:storm nimbus

 

【啟動supervisor后臺程序】

命令格式:storm supervisor

 

【啟動drpc服務】

命令格式:storm drpc

 

【啟動ui服務】

命令格式:storm ui

 

【啟動REPL】

REPL — read-evaluate-print-loop。

雖然clojure可以作為一種腳本語言內(nèi)嵌在java里面,但是它的首選編程方式是使用REPL,這是一個簡單的命令行接口,

使用它你可以輸入你的命令,執(zhí)行,然后查看結(jié)果, 你可以以下面這個命令來啟動REPL:

命令格式:storm repl

 

【打印本地配置】

命令格式:storm localconfvalue [配置參數(shù)關鍵字]

舉例:storm localconfvalue storm.zookeeper.servers

#根據(jù)指定參數(shù)打印本地配置的值。

 

【打印遠程配置】

命令格式:storm remoteconfvalue [配置參數(shù)關鍵字]

舉例:storm remoteconfvalue storm.zookeeper.servers

#根據(jù)指定參數(shù)打印遠程配置的值。

 

【執(zhí)行Shell腳本】

命令格式:storm shell resourcesdir command args

 

【打印CLASSPATH】

命令格式:storm classpath

 

====Storm調(diào)優(yōu):

--調(diào)優(yōu)對象

當一個topology在storm cluster中運行時,它的并發(fā)主要跟3個邏輯對象相關:worker => executor =>task。(=>代表1對N)

(1)Worker

Worker是運行在工作節(jié)點上面,被Supervisor守護進程創(chuàng)建的用來干活的JVM進程。

每個Worker對應于一個給定topology的全部執(zhí)行任務的一個子集。

反過來說,一個Worker里面不會運行屬于不同的topology的執(zhí)行任務。

它可以通過[storm rebalance]命令任意調(diào)整。

(2)Executor

可以理解成一個Worker進程中的工作線程。

一個Executor中只能運行隸屬于同一個component(spout/bolt)的task。

一個Worker進程中可以有一個或多個Executor線程。在默認情況下,一個Executor運行一個task。

它可以通過[storm rebalance]命令任意調(diào)整。

(3)Task

Task則是spout和bolt中具體要干的活了。一個Executor可以負責1個或多個task。

每個component(spout/bolt)的并發(fā)度就是這個component對應的task數(shù)量。

同時,task也是各個節(jié)點之間進行grouping(partition)的單位。無法在運行時調(diào)整。

 

--設置方法:

conf.setNumWorkers(workers);                                        //設置worker數(shù)量

uilder.setBolt("2", new WordSpliter(),4)                             //設置Executor并發(fā)數(shù)量

builder.setBolt("2", new WordSpliter(),4).setNumTasks(1); //設置每個線程處理的Task數(shù)量

--任務分配:

任務分配是有下面兩種情況:

①、task數(shù)目比worker多:

例如task是[1 2 3 4],可用的slot(所謂slot就是可用的worker)只有[host1:port1,host2:port1],那么最終是這樣分配1:[host1:port1]

2:[host2:port1]

3:[host1:port1]

4:[host2:port1]

②、task數(shù)目比worker少:

例如task是[1 2],而worker有[host1:port1,host1:port2,host2:port1,host2:port2],

那么首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個機器上,也就是將worker排列成

[host1:port1,host2:port1,host1:port2,host2:port2]

然后分配任務為:

1:[host1:port1]

2:[host2:port1]

 

--簡單舉例:

通過Config.setNumWorkers(int))來指定一個storm集群中執(zhí)行topolgy的進程數(shù)量,所有的線程將在這些指定的worker進程中運行。

比如說一個topology中要啟動300個線程來運行spout/bolt,而指定的worker進程數(shù)量是60個。

那么storm將會給每個worker分配5個線程來跑spout/bolt。

如果要對一個topology進行調(diào)優(yōu),可以調(diào)整worker數(shù)量和spout/bolt的parallelism(并發(fā)度,即executor)數(shù)量。

(調(diào)整參數(shù)之后要記得重新部署topology,后續(xù)會為該操作提供一個swapping的功能來減小重新部署的時間)。

 

例如:builder.setBolt("cpp", new CppBolt(), 3).setNumTasks(5).noneGrouping(pre_name); 會創(chuàng)建3個線程,但有內(nèi)存中會5個CppBolt對象,3個線程調(diào)度5個對象。

 

--網(wǎng)上搜羅的一些經(jīng)驗:①、對于worker和task之間的比例,網(wǎng)上也給出了參考,。即1個worker包含10~15個左右。當然這個參考,實際情況還是要根據(jù)配置和測試情況。

②、executor數(shù)最大不能超過該bolt的task數(shù)。

 

--Strom集群命令

12345678910111213141516171819202122232425[root@h2master bin]# storm Commands:         activate         classpath         deactivate         dev-zookeeper         drpc         help 命令幫助         jar   執(zhí)行上傳的jar包         kill   殺死正在執(zhí)行的topology 后面跟 topology的名稱         list   查看運行的所有topology運行情況         localconfvalue         logviewer   啟動topology日志         nimbus      啟動nimbus         rebalance   shell方式下修改topology運行參數(shù)比如worker個數(shù) task個數(shù)等         remoteconfvalue         repl         shell         supervisor  啟動supervisor         ui              啟動topology ui界面         version       Help:          help          help <command>

 

[root@h2master bin]# storm  Commands:          activate          classpath          deactivate          dev-zookeeper          drpc          help 命令幫助          jar   執(zhí)行上傳的jar包          kill   殺死正在執(zhí)行的topology 后面跟 topology的名稱          list   查看運行的所有topology運行情況          localconfvalue          logviewer   啟動topology日志          nimbus      啟動nimbus          rebalance   shell方式下修改topology運行參數(shù)比如worker個數(shù) task個數(shù)等          remoteconfvalue          repl          shell          supervisor  啟動supervisor          ui              啟動topology ui界面          version       Help:           help           help <command>

 

 

--END--


發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 昌江| 房产| 彰武县| 读书| 巴彦县| 望江县| 嘉义县| 柳江县| 安义县| 锡林郭勒盟| 沙雅县| 林芝县| 康保县| 石嘴山市| 太仓市| 永顺县| 民县| 门源| 墨脱县| 祥云县| 陵水| 海原县| 武山县| 武乡县| 邵武市| 满城县| 新疆| 绥滨县| 苏尼特左旗| 合阳县| 江门市| 太仓市| 常德市| 汾阳市| 营山县| 新疆| 鹰潭市| 奉新县| 余干县| 平乡县| 四川省|