Skip to content

Commit

Permalink
Allow VirtualTimeScheduler to run on any thread (ReactiveX#2610)
Browse files Browse the repository at this point in the history
* Allow VirtualTimeScheduler to run on any thread as long as critical methods are all called on the same thread.

* Move thread assignment to init method.
  • Loading branch information
danielt1263 authored Nov 14, 2024
1 parent 7570d44 commit 8932598
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions RxSwift/Schedulers/VirtualTimeScheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ open class VirtualTimeScheduler<Converter: VirtualTimeConverterType>

private var nextId = 0

private let thread: Thread

/// - returns: Current time.
public var now: RxTime {
self.converter.convertFromVirtualTime(self.clock)
Expand All @@ -41,6 +43,7 @@ open class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
self.currentClock = initialClock
self.running = false
self.converter = converter
self.thread = Thread.current
self.schedulerQueue = PriorityQueue(hasHigherPriority: {
switch converter.compareVirtualTime($0.time, $1.time) {
case .lessThan:
Expand Down Expand Up @@ -106,8 +109,7 @@ open class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public func scheduleAbsoluteVirtual<StateType>(_ state: StateType, time: VirtualTime, action: @escaping (StateType) -> Disposable) -> Disposable {
MainScheduler.ensureExecutingOnScheduler()

ensusreRunningOnCorrectThread()
let compositeDisposable = CompositeDisposable()

let item = VirtualSchedulerItem(action: {
Expand All @@ -130,12 +132,11 @@ open class VirtualTimeScheduler<Converter: VirtualTimeConverterType>

/// Starts the virtual time scheduler.
public func start() {
MainScheduler.ensureExecutingOnScheduler()

if self.running {
return
}

ensusreRunningOnCorrectThread()
self.running = true
repeat {
guard let next = self.findNext() else {
Expand Down Expand Up @@ -170,12 +171,11 @@ open class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
///
/// - parameter virtualTime: Absolute time to advance the scheduler's clock to.
public func advanceTo(_ virtualTime: VirtualTime) {
MainScheduler.ensureExecutingOnScheduler()

if self.running {
fatalError("Scheduler is already running")
}

ensusreRunningOnCorrectThread()
self.running = true
repeat {
guard let next = self.findNext() else {
Expand All @@ -199,8 +199,7 @@ open class VirtualTimeScheduler<Converter: VirtualTimeConverterType>

/// Advances the scheduler's clock by the specified relative time.
public func sleep(_ virtualInterval: VirtualTimeInterval) {
MainScheduler.ensureExecutingOnScheduler()

ensusreRunningOnCorrectThread()
let sleepTo = self.converter.offsetVirtualTime(self.clock, offset: virtualInterval)
if self.converter.compareVirtualTime(sleepTo, self.clock).lessThen {
fatalError("Can't sleep to past.")
Expand All @@ -211,8 +210,7 @@ open class VirtualTimeScheduler<Converter: VirtualTimeConverterType>

/// Stops the virtual time scheduler.
public func stop() {
MainScheduler.ensureExecutingOnScheduler()

ensusreRunningOnCorrectThread()
self.running = false
}

Expand All @@ -221,6 +219,12 @@ open class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
_ = Resources.decrementTotal()
}
#endif

private func ensusreRunningOnCorrectThread() {
guard Thread.current == thread else {
rxFatalError("Executing on the wrong thread. Please ensure all work on the same thread.")
}
}
}

// MARK: description
Expand Down

0 comments on commit 8932598

Please sign in to comment.