FunDA最重要的設計目標之一就是能夠實現數據庫操作的并行運算。我們先重溫一下fs2是如何實現并行運算的。我們用interleave、merge、either這幾種方式來同時處理兩個Stream里的元素。interleave保留了固定的交叉排列順序,而merge和either則會產生不特定順序,這個現象可以從下面的例子里看到:
implicit val strategy = Strategy.fromFixedDaemonPool(4)implicit val scheduler = Scheduler.fromFixedDaemonPool(2) //當前元素跟蹤顯示def log[A](PRe: String): Pipe[Task,A,A] = _.evalMap { row => Task.delay {println(s"${pre}>${row}");row}} def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a => { val delay: Task[Int] = Task.delay {scala.util.Random.nextInt(max.toMillis.toInt)} delay.flatMap {d => Task.now(a).schedule(d.millis)} }} val s1: Stream[Task,Int] = Stream(1,2,3,4,5).through(randomDelay(100.millis)) val s2 = Stream(11,22,33,44,55,66).through(randomDelay(30.millis))val s3: Stream[Task,String] = Stream("a","b","c").through(randomDelay(200.millis))(s1 interleave s2).through(log("")).run.unsafeRun //> >1 //| >11 //| >2 //| >22 //| >3 //| >33 //| >4 //| >44 //| >5 //| >55(s1 merge s2).through(log("")).run.unsafeRun //> >11 //| >1 //| >22 //| >2 //| >33 //| >44 //| >3 //| >55 //| >4 //| >5 //| >66(s1 either s3).through(log("")).run.unsafeRun //> >Left(1) //| >Left(2) //| >Right(a) //| >Right(b) //| >Left(3) //| >Left(4) //| >Left(5) //| >Right(c)從上面的例子我們可以看到merge產生的不規則順序。fs2的nondeterministic算法可以保證兩個隊列元素處理順序的合理分配最大化。如果我們需要對兩個以上數據流進行并行處理的話,fs2提供了join(mergeN)函數:
def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}從這個函數的款式我們看到它的入參數outer是個Stream[F,Stream[F,O]]類型,是個內外兩層的流。現實場景如外層是多個數據庫連接(connections),內層是多個客戶端(clients)。在FunDA的功能描述里外層是多個數據源(sources),內層是多個讀取函數(reader),又或者外層是多個數據行(元素),內層是數據處理函數。我們先看看如何實現多個數據源的并行產生:val ss: Stream[Task,Stream[Task,Int]] = Stream(s1,s2,s1,s2) //> ss : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(Seg從ss的類型款式來看,我們可以直接用Stream構建器來生成這個Stream[Task,Stream[Task,A]]類型。在前面我們已經掌握了用Slick來產生Stream[Task,FDAROW]的方法,例如:val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()albumStream1是個Reactive-Stream數據源。這樣我們可以在FunDA里增加一個并行Source構建函數:def fda_par_load(sources: FDAPipeLine[FDAROW]*)(maxOpen: Int) = { concurrent.join(maxOpen)(Stream(sources: _*)) }maxOpen代表最多可以同時運行的運算數,最好取小于機器內核數的一個數。用這個函數來并行構建數據源:package com.bayakala.funda.fdapars.examplesimport slick.driver.H2Driver.api._import com.bayakala.funda.samples._import com.bayakala.funda.fdarows.FDAROWimport com.bayakala.funda.fdasources.FDADataStream._import scala.concurrent.duration._import com.bayakala.funda.fdapipes._import FDAValves._import com.bayakala.funda.fdapars.FDAPars._object Example1 extends App { val albums = SlickModels.albums val companies = SlickModels.companies //數據源query val albumsInfo = for { (a,c) <- albums join companies on (_.company === _.id) } yield (a.title,a.artist,a.year,c.name) //query結果強類型(用戶提供) case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW //強類型轉換函數(用戶提供) def toTypedRow(row: (String, String, Option[Int], String)): Album = Album(row._1, row._2, row._3.getOrElse(2000), row._4) val db = Database.forConfig("h2db") val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _) val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()() val albumStream2 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()() val albumStream3 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()() def printAlbums: FDATask[FDAROW] = row => { row match { case album: Album => println("____________________") println(s"品名:${album.title}") println(s"演唱:${album.artist}") println(s"年份:${album.year}") println(s"發行:${album.publisher}") fda_skip // fda_next(album) case r@_ => fda_next(r) } } fda_par_load(albumStream1,albumStream1,albumStream1)(3).appendTask(printAlbums).startRunstartRun后顯示結果:
*** (c.z.hikari.HikariDataSource) HikariCP pool h2db is starting.*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"____________________品名:Keyboard Cat's Greatest Hits演唱:Keyboard Cat年份:2016發行:Sony Music Inc____________________品名:Keyboard Cat's Greatest Hits演唱:Keyboard Cat年份:2016發行:Sony Music Inc____________________品名:Keyboard Cat's Greatest Hits演唱:Keyboard Cat年份:2016發行:Sony Music Inc____________________品名:Spice演唱:Spice Girls年份:2016發行:Columbia Records____________________品名:Spice演唱:Spice Girls年份:2016發行:Columbia Records____________________品名:Spice演唱:Spice Girls年份:2016發行:Columbia Records____________________品名:Whenever You Need Somebody演唱:Rick Astley年份:2016發行:Sony Music Inc____________________品名:Whenever You Need Somebody演唱:Rick Astley年份:2016發行:Sony Music Inc____________________品名:Whenever You Need Somebody演唱:Rick Astley年份:2016發行:Sony Music Inc____________________品名:The Triumph of Steel演唱:Manowar年份:2016發行:The K-Pops Singers____________________品名:The Triumph of Steel演唱:Manowar年份:2016發行:The K-Pops Singers____________________品名:The Triumph of Steel演唱:Manowar年份:2016發行:The K-Pops Singers____________________品名:Believe演唱:Justin Bieber年份:2016發行:Columbia Records____________________品名:Believe演唱:Justin Bieber年份:2016發行:Columbia Records____________________品名:Believe演唱:Justin Bieber年份:2016發行:Columbia RecordsProcess finished with exit code 0FunDA的另一個并行運算需求是并行對一長串數據元素進行一個函數的施用。先看看這個函數的款式:
//作業類型 type FDATask[ROW] = ROW => Option[List[ROW]]也就是我們前面使用過的,由用戶提供的那個作業函數類型。但是再看看fda_runPar函數,只能對下面這種類型進行并行運算:def fda_runPar(parTask: FDAParTask)(maxOpen: Int) = concurrent.join(maxOpen)(parTask).through(fda_afterPar) //并行作業類型 type FDAParTask = Stream[Task,Stream[Task,Option[List[FDAROW]]]]我們首先必須把Stream[Task,A]轉成Stream[Task,Stream[Task,A]]:implicit class toFDAOps(fs2Stream: FDAPipeLine[FDAROW]) { def appendTask(t: FDATask[FDAROW]) = fs2Stream.through(fda_execUserTask(t)) def startRun = fs2Stream.run.unsafeRun def startFuture = fs2Stream.run.unsafeRunAsyncFuture def toPar(st: FDATask[FDAROW]): Stream[Task, Stream[Task, Option[List[FDAROW]]]] = fs2Stream.map { row => Stream.eval(Task { st(row) }) } }我們可以用toPar來實現并行運算類型轉換。下面是一個調用例子://并行作業函數 def updateYear: FDATask[FDAROW] = row => { row match { case album: Album => val action = albums.filter{r => r.title === album.title}.map(_.year).update(Some(2016)) //把原數據和新構建的Action一起傳下去 fda_next(List(album,FDAActionRow(action))) case others@ _ => fda_next(others) } }//并行讀取 val s1 = fda_par_load(albumStream1,albumStream1,albumStream1)(3)//并行構建Action val s2 = fda_runPar(s1.toPar(updateYear))(3)s1是并行構建的數據源,s2是對數據源產生的元素進行并行的函數updateYear施用。我們同樣可以把產生的ActionRow用并行的方法來運算:val runner = FDAActionRunner(slick.driver.H2Driver) //并行運算函數 def runActions: FDATask[FDAROW] = row => { row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case others@ _ => fda_next(others) } }//并行運算Action val s3 = fda_runPar(s2.toPar(runActions))(3)//開始運算 s3.appendTask(printAlbums).startRun從上面的例子里應該能夠體會到函數式編程的靈活性:在startRun之前,我們可以任意進行函數組合,而且靜態類型系統(static type system)會幫我們檢查各組件的類型是否匹配。下面是具體運算結果顯示:*** (c.z.hikari.HikariDataSource) HikariCP pool h2db is starting.*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"*** (s.jdbc.JdbcBackend.statement) Preparing statement: select x2."TITLE", x2."ARTIST", x2."YEAR", x3."NAME" from "ALBUMS" x2, "COMPANY" x3 where x2."COMPANY" = x3."ID"*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Keyboard Cat''s Greatest Hits'____________________品名:Keyboard Cat's Greatest Hits演唱:Keyboard Cat年份:1999發行:Sony Music Inc*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Keyboard Cat''s Greatest Hits'____________________品名:Keyboard Cat's Greatest Hits演唱:Keyboard Cat年份:1999發行:Sony Music Inc*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Keyboard Cat''s Greatest Hits'____________________品名:Keyboard Cat's Greatest Hits演唱:Keyboard Cat年份:1999發行:Sony Music Inc*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Spice'____________________品名:Spice演唱:Spice Girls年份:1999發行:Columbia Records*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Spice'____________________品名:Spice演唱:Spice Girls年份:1999發行:Columbia Records*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Spice'____________________品名:Spice演唱:Spice Girls年份:1999發行:Columbia Records*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Whenever You Need Somebody'____________________品名:Whenever You Need Somebody演唱:Rick Astley年份:1999發行:Sony Music Inc*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Whenever You Need Somebody'____________________品名:Whenever You Need Somebody演唱:Rick Astley年份:1999發行:Sony Music Inc*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Whenever You Need Somebody'____________________品名:Whenever You Need Somebody演唱:Rick Astley年份:1999發行:Sony Music Inc*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'The Triumph of Steel'____________________品名:The Triumph of Steel演唱:Manowar年份:1999發行:The K-Pops Singers*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'The Triumph of Steel'____________________品名:The Triumph of Steel演唱:Manowar年份:1999發行:The K-Pops Singers*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'The Triumph of Steel'____________________品名:The Triumph of Steel演唱:Manowar年份:1999發行:The K-Pops Singers*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Believe'____________________品名:Believe演唱:Justin Bieber年份:1999發行:Columbia Records*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Believe'____________________品名:Believe演唱:Justin Bieber年份:1999發行:Columbia Records*** (s.jdbc.JdbcBackend.statement) Preparing statement: update "ALBUMS" set "YEAR" = ? where "ALBUMS"."TITLE" = 'Believe'____________________品名:Believe演唱:Justin Bieber年份:1999發行:Columbia RecordsProcess finished with exit code 0注意:上面這個例子是存粹做出來作為函數調用示范的,不做任何邏輯和應用上的考慮。下面是本篇討論的示范源代碼:package com.bayakala.funda.fdapars.examplesimport slick.driver.H2Driver.api._import com.bayakala.funda.samples._import com.bayakala.funda.fdarows.FDARowTypes._import com.bayakala.funda.fdarows.FDAROWimport com.bayakala.funda.fdasources.FDADataStream._import scala.concurrent.duration._import com.bayakala.funda.fdapipes._import FDAValves._import com.bayakala.funda.fdapars.FDAPars._import com.bayakala.funda.fdarows.FDARowTypes.FDAActionRowobject Example1 extends App { val albums = SlickModels.albums val companies = SlickModels.companies //數據源query val albumsInfo = for { (a,c) <- albums join companies on (_.company === _.id) } yield (a.title,a.artist,a.year,c.name) //query結果強類型(用戶提供) case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW //轉換函數(用戶提供) def toTypedRow(row: (String, String, Option[Int], String)): Album = Album(row._1, row._2, row._3.getOrElse(2000), row._4) val db = Database.forConfig("h2db") val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _) val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()() val albumStream2 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()() val albumStream3 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()() def printAlbums: FDATask[FDAROW] = row => { row match { case album: Album => println("____________________") println(s"品名:${album.title}") println(s"演唱:${album.artist}") println(s"年份:${album.year}") println(s"發行:${album.publisher}") fda_skip // fda_next(album) case r@_ => fda_next(r) } } // fda_par_load(albumStream1,albumStream1,albumStream1)(3).appendTask(printAlbums).startRun //并行作業函數 def updateYear: FDATask[FDAROW] = row => { row match { case album: Album => val action = albums.filter{r => r.title === album.title}.map(_.year).update(Some(2016)) //把原數據和新構建的Action一起傳下去 fda_next(List(album,FDAActionRow(action))) case others@ _ => fda_next(others) } } val runner = FDAActionRunner(slick.driver.H2Driver) //并行運算函數 def runActions: FDATask[FDAROW] = row => { row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case others@ _ => fda_next(others) } }//并行讀取 val s1 = fda_par_load(albumStream1,albumStream1,albumStream1)(3)//并行構建Action val s2 = fda_runPar(s1.toPar(updateYear))(3)//并行運算Action val s3 = fda_runPar(s2.toPar(runActions))(3)//開始運算 s3.appendTask(printAlbums).startRun}
新聞熱點
疑難解答