RxSwift 入門 その7
RxSwift 入門 その6 - タコさんブログ の続き。 今回は、RxSwiftプレイグラウンドの Mathematical and Aggregate Operators 、Connectable Observable Operators の項。プレイグラウンドの内容としてはこれで最後。
Mathematical and Aggregate Operators
この項では、Observableによって送信される全てのアイテムのシーケンス(ストリーム)に作用するオペレータに関して説明してある。
以下、プレイグラウンドに説明してあるMathematical and Aggregate Operatorsに関するオペレータ。
- concat
- reduce
concat
2つ、または、それ以上のObservablesから送信されるアイテム(イベント)を割り込みすることなく送信する。
let s1 = [0, 1, 2].toObservable() let s2 = [5, 6, 7, 8].toObservable() _ = s1.concat(s2) .subscribe { print($0) }
この出力は、
Next(0) Next(1) Next(2) Next(5) Next(6) Next(7) Next(8) Completed
このマーブルダイアグラムは以下のように表せられる。
s1 -0-1-2| s2 -5-6-7-8| ↓ concat s2 r -0-1-2-5-6-7-8|
reduce
Observableが送信する各アイテム(イベント)に関数を順次適用し、結果を送信する。reduceは、完了するまでシーケンス(ストリーム)の各要素に関数を実行し、蓄積された値をメッセージと共に送信する。これはシーケンス上のSwiftのreduce関数と同じような働きをする。
let stream = (0..<10).toObservable() _ = stream .reduce(0, accumulator: +) // 初期値 0 .subscribe { print($0) }
この出力は、
Next(45) Completed
Connectable Observable Operators
この項では、Connectable Observable に関して説明してある。Connectable Observableは、サブスクライブした時ではなく、connect()メソッドが呼ばれた時にアイテムの送信が開始されること以外は、通常のObservableに似ている。このようにして、Observableがアイテムの送信を開始する前に、Observableをサブスクライブするために、すべての期待するサブスクライバーを待つことができる。
以下、プレイグラウンドに説明してあるConnectable Observable Operatorsに関するオペレータ。
- multicast
- replay
- publish
この項を実行するにあたっての注意
この項のサンプルは、//sampleWithoutConnectableOperators() のようにコメントアウトされているので、実行を確認する場合はコメントを外す必要がある
intervalオペレータを使用して、整数のシーケンス(ストリーム)を生成している。intervalオペレータは与えられた時間間隔ごとに無限に増加する整数のアイテムを送信するシーケンスを生成する
intervalオペレータを使用しているため、 XCPlaygroundPage.currentPage.needsIndefiniteExecution = true とする必要がある。また、遅延させる必要があるため、サポート関数として以下のdelay関数を利用している
// delay秒後に、メインスレッドでclosureを実行する public func delay(delay:Double, closure:()->()) { dispatch_after( dispatch_time(DISPATCH_TIME_NOW,Int64(delay * Double(NSEC_PER_SEC))), dispatch_get_main_queue(), closure) }
Without Connect
connectを使用しない場合の、挙動確認。
// 1秒間隔で整数を送信するシーケンスを生成する let s = Observable<Int>.interval(1, scheduler: MainScheduler.instance) // 1つ目のサブスクライブ _ = s .subscribe { print("first subscription \($0)") } // 2秒後に、さらにサブスクライブする delay(2) { print("-- after delay(2) --") _ = s .subscribe { print("second subscription \($0)") } }
この出力は、
first subscription Next(0) first subscription Next(1) -- after delay(2) -- first subscription Next(2) second subscription Next(0) first subscription Next(3) second subscription Next(1) first subscription Next(4) // ...
after delay(2) 後の first subscription の値が 2 で、second subscription の値は 0 になっていることが分かる。
multicast
let subject = PublishSubject<Int>() _ = subject .subscribe { print("Subject \($0)") } // multicast を使用して Connectable Observable を取得する let s = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .multicast(subject) _ = s .subscribe { print("first subscription \($0)") } delay(2) { print("-- connect 呼びだし --") s.connect() } delay(4) { print("-- after delay(4) --") _ = s .subscribe { print("second subscription \($0)") } } delay(6) { print("-- after delay(6) --") _ = s .subscribe { print("third subscription \($0)") } }
この出力は、
-- connect 呼びだし -- Subject Next(0) first subscription Next(0) -- after delay(4) -- Subject Next(1) first subscription Next(1) second subscription Next(1) Subject Next(2) first subscription Next(2) second subscription Next(2) -- after delay(6) -- Subject Next(3) first subscription Next(3) second subscription Next(3) third subscription Next(3) Subject Next(4) first subscription Next(4) second subscription Next(4) third subscription Next(4) // ...
connect呼びだし後にアイテム(イベント)が送信され、first・second・third subscriptionの値が同じになっていることが分かる。
replay
replayオペレータは、Observableがアイテムの送信を開始した後に、オブザーバがサブスクライブしても、全てのオブザーバが同じ送信されたアイテム(イベント)のシーケンス(ストリーム)を受け取ることを保証する。
publish = multicast + replay subject
let s = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .replay(1) // Buffer Size:1 _ = s .subscribe { print("first subscription \($0)") } delay(2) { print("-- connect 呼びだし --") s.connect() } delay(4) { print("-- after delay(4) --") _ = s .subscribe { print("second subscription \($0)") } } delay(6) { print("-- after delay(6) --") _ = s .subscribe { print("third subscription \($0)") } }
この出力は、
-- connect 呼びだし -- first subscription Next(0) -- after delay(4) -- second subscription Next(0) first subscription Next(1) second subscription Next(1) first subscription Next(2) second subscription Next(2) -- after delay(6) -- third subscription Next(2) first subscription Next(3) second subscription Next(3) third subscription Next(3) first subscription Next(4) second subscription Next(4) third subscription Next(4) // ...
publish
通常のObservableをConnectable Observableへ変換する。
publish = multicast + publish subject
つまり、publishは基本的にはreplay(0)と同じ。
let s = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .publish() _ = s .subscribe { print("first subscription \($0)") } delay(2) { print("-- connect 呼びだし --") s.connect() } delay(4) { print("-- after delay(4) --") _ = s .subscribe { print("second subscription \($0)") } } delay(6) { print("-- after delay(5) --") _ = s .subscribe { print("third subscription \($0)") } }
この出力は、
-- connect 呼びだし -- first subscription Next(0) -- after delay(4) -- first subscription Next(1) second subscription Next(1) first subscription Next(2) second subscription Next(2) -- after delay(5) -- first subscription Next(3) second subscription Next(3) third subscription Next(3) first subscription Next(4) second subscription Next(4) third subscription Next(4) // ...
参考URL
Mathematical and Aggregate Operators
Connectable Observable Operators