前言
最近有一項需求,要定時判斷任務執行條件是否滿足并觸發 Spark 任務,平時編寫 Spark 任務時都是封裝為一個 Jar 包,然后采用 Shell 腳本形式傳入所需參數執行,考慮到本次判斷條件邏輯復雜,只用 Shell 腳本完成不利于開發測試,所以調研使用了 Python 和 Java 分別調用 Spark 腳本的方法。
使用版本為 Python 3.6.4 及 JDK 8
Python
主要使用 subprocess 庫。Python 的 API 變動比較頻繁,在 3.5 之后新增了 run 方法,這大大降低了使用難度和遇見 Bug 的概率。
subprocess.run(["ls", "-l"])subprocess.run(["sh", "/path/to/your/script.sh", "arg1", "arg2"])
為什么說使用 run 方法可以降低遇見 Bug 的概率呢?
在沒有 run 方法之前,我們一般調用其他的高級方法,即 Older high-level API,比如 call,check_all,或者直接創建 Popen 對象。因為默認的輸出是 console,這時如果對 API 不熟悉或者沒有仔細看 doc,想要等待子進程運行完畢并獲取輸出,使用了 stdout = PIPE 再加上 wait 的話,當輸出內容很多時會導致 Buffer 寫滿,進程就一直等待讀取,形成死鎖。在一次將 Spark 的 log 輸出到 console 時,就遇到了這種奇怪的現象,下邊的腳本可以模擬:
# a.shfor i in {0..9999}; do echo '***************************************************'done p = subprocess.Popen(['sh', 'a.sh'], stdout=subprocess.PIPE)p.wait()
而 call 則在方法內部直接調用了 wait 產生相同的效果。
要避免死鎖,則必須在 wait 方法調用之前自行處理掉輸入輸出,或者使用推薦的 communicate 方法。 communicate 方法是在內部生成了讀取線程分別讀取 stdout stderr,從而避免了 Buffer 寫滿。而之前提到的新的 run 方法,就是在內部調用了 communicate。
stdout, stderr = process.communicate(input, timeout=timeout)
Java
說完了 Python,Java 就簡單多了。
Java 一般使用 Runtime.getRuntime().exec() 或者 ProcessBuilder 調用外部腳本:
Process p = Runtime.getRuntime().exec(new String[]{"ls", "-al"});Scanner sc = new Scanner(p.getInputStream());while (sc.hasNextLine()) { System.out.println(sc.nextLine());}// orProcess p = new ProcessBuilder("sh", "a.sh").start(); p.waitFor(); // dead lock 需要注意的是:這里 stream 的方向是相對于主程序的,所以 getInputStream() 就是子進程的輸出,而 getOutputStream() 是子進程的輸入。
基于同樣的 Buffer 原因,假如調用了 waitFor 方法等待子進程執行完畢而沒有及時處理輸出的話,就會造成死鎖。
由于 Java API 很少變動,所以沒有像 Python 那樣提供新的 run 方法,但是開源社區也給出了自己的方案,如commons exec,或 http://www.baeldung.com/run-shell-command-in-java,或 alvin alexander 給出的方案(雖然不完整)。            
新聞熱點
疑難解答