タコさんブログ

プログラミングメモと小言

RxSwift 入門 その7

RxSwift 入門 その6 - タコさんブログ の続き。 今回は、RxSwiftプレイグラウンドの Mathematical and Aggregate OperatorsConnectable Observable Operators の項。プレイグラウンドの内容としてはこれで最後。

Mathematical and Aggregate Operators

この項では、Observableによって送信される全てのアイテムのシーケンス(ストリーム)に作用するオペレータに関して説明してある。

以下、プレイグラウンドに説明してあるMathematical and Aggregate Operatorsに関するオペレータ。

  • concat
  • reduce

concat

2つ、または、それ以上のObservablesから送信されるアイテム(イベント)を割り込みすることなく送信する。

let s1 = [0, 1, 2].toObservable()
let s2 = [5, 6, 7, 8].toObservable()

_ = s1.concat(s2)
  .subscribe {
    print($0)
  }

この出力は、

Next(0)
Next(1)
Next(2)
Next(5)
Next(6)
Next(7)
Next(8)
Completed

このマーブルダイアグラムは以下のように表せられる。

s1 -0-1-2|
s2       -5-6-7-8|
 ↓ concat s2
r  -0-1-2-5-6-7-8|

reduce

Observableが送信する各アイテム(イベント)に関数を順次適用し、結果を送信する。reduceは、完了するまでシーケンス(ストリーム)の各要素に関数を実行し、蓄積された値をメッセージと共に送信する。これはシーケンス上のSwiftのreduce関数と同じような働きをする。

let stream = (0..<10).toObservable()
_ = stream
  .reduce(0, accumulator: +)  // 初期値 0
  .subscribe {
    print($0)
  }

この出力は、

Next(45)
Completed

Connectable Observable Operators

この項では、Connectable Observable に関して説明してある。Connectable Observableは、サブスクライブした時ではなく、connect()メソッドが呼ばれた時にアイテムの送信が開始されること以外は、通常のObservableに似ている。このようにして、Observableがアイテムの送信を開始する前に、Observableをサブスクライブするために、すべての期待するサブスクライバーを待つことができる。

以下、プレイグラウンドに説明してあるConnectable Observable Operatorsに関するオペレータ。

  • multicast
  • replay
  • publish

この項を実行するにあたっての注意

  1. この項のサンプルは、//sampleWithoutConnectableOperators() のようにコメントアウトされているので、実行を確認する場合はコメントを外す必要がある

  2. intervalオペレータを使用して、整数のシーケンス(ストリーム)を生成している。intervalオペレータは与えられた時間間隔ごとに無限に増加する整数のアイテムを送信するシーケンスを生成する

  3. intervalオペレータを使用しているため、 XCPlaygroundPage.currentPage.needsIndefiniteExecution = true とする必要がある。また、遅延させる必要があるため、サポート関数として以下のdelay関数を利用している

// delay秒後に、メインスレッドでclosureを実行する
public func delay(delay:Double, closure:()->()) {
  dispatch_after(
      dispatch_time(DISPATCH_TIME_NOW,Int64(delay * Double(NSEC_PER_SEC))),
      dispatch_get_main_queue(), closure)
}

Without Connect

connectを使用しない場合の、挙動確認。

// 1秒間隔で整数を送信するシーケンスを生成する
let s = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
// 1つ目のサブスクライブ
_ = s
  .subscribe {
    print("first subscription \($0)")
  }
// 2秒後に、さらにサブスクライブする
delay(2) {
  print("-- after delay(2) --")
  _ = s
    .subscribe {
      print("second subscription \($0)")
    }
}

この出力は、

first subscription Next(0)
first subscription Next(1)
-- after delay(2) --
first subscription Next(2)
second subscription Next(0)
first subscription Next(3)
second subscription Next(1)
first subscription Next(4)
// ...

after delay(2) 後の first subscription の値が 2 で、second subscription の値は 0 になっていることが分かる。

multicast

let subject = PublishSubject<Int>()

_ = subject
  .subscribe {
    print("Subject \($0)")
}

// multicast を使用して Connectable Observable を取得する
let s = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
  .multicast(subject)

_ = s
  .subscribe {
    print("first subscription \($0)")
}

delay(2) {
  print("-- connect 呼びだし --")
  s.connect()
}

delay(4) {
  print("-- after delay(4) --")
  _ = s
    .subscribe {
      print("second subscription \($0)")
  }
}

delay(6) {
  print("-- after delay(6) --")
  _ = s
    .subscribe {
      print("third subscription \($0)")
  }
}

この出力は、

-- connect 呼びだし --
Subject Next(0)
first subscription Next(0)
-- after delay(4) --
Subject Next(1)
first subscription Next(1)
second subscription Next(1)
Subject Next(2)
first subscription Next(2)
second subscription Next(2)
-- after delay(6) --
Subject Next(3)
first subscription Next(3)
second subscription Next(3)
third subscription Next(3)
Subject Next(4)
first subscription Next(4)
second subscription Next(4)
third subscription Next(4)
// ...

connect呼びだし後にアイテム(イベント)が送信され、first・second・third subscriptionの値が同じになっていることが分かる。

replay

replayオペレータは、Observableがアイテムの送信を開始した後に、オブザーバがサブスクライブしても、全てのオブザーバが同じ送信されたアイテム(イベント)のシーケンス(ストリーム)を受け取ることを保証する。

publish = multicast + replay subject

let s = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
  .replay(1)  // Buffer Size:1

_ = s
  .subscribe {
    print("first subscription \($0)")
  }

delay(2) {
  print("-- connect 呼びだし --")
  s.connect()
}

delay(4) {
  print("-- after delay(4) --")
  _ = s
    .subscribe {
      print("second subscription \($0)")
  }
}

delay(6) {
  print("-- after delay(6) --")
  _ = s
    .subscribe {
      print("third subscription \($0)")
  }
}

この出力は、

-- connect 呼びだし --
first subscription Next(0)
-- after delay(4) --
second subscription Next(0)
first subscription Next(1)
second subscription Next(1)
first subscription Next(2)
second subscription Next(2)
-- after delay(6) --
third subscription Next(2)
first subscription Next(3)
second subscription Next(3)
third subscription Next(3)
first subscription Next(4)
second subscription Next(4)
third subscription Next(4)
// ...

publish

通常のObservableをConnectable Observableへ変換する。
publish = multicast + publish subject
つまり、publishは基本的にはreplay(0)と同じ。

let s = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
  .publish()

_ = s
  .subscribe {
    print("first subscription \($0)")
}

delay(2) {
  print("-- connect 呼びだし --")
  s.connect()
}

delay(4) {
  print("-- after delay(4) --")
  _ = s
    .subscribe {
      print("second subscription \($0)")
  }
}

delay(6) {
  print("-- after delay(5) --")
  _ = s
    .subscribe {
      print("third subscription \($0)")
  }
}

この出力は、

-- connect 呼びだし --
first subscription Next(0)
-- after delay(4) --
first subscription Next(1)
second subscription Next(1)
first subscription Next(2)
second subscription Next(2)
-- after delay(5) --
first subscription Next(3)
second subscription Next(3)
third subscription Next(3)
first subscription Next(4)
second subscription Next(4)
third subscription Next(4)
// ...

参考URL

Mathematical and Aggregate Operators

Connectable Observable Operators