Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark Operator Closures as @Sendable #2639

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion Platform/RecursiveLock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import Foundation

#if TRACE_RESOURCES
class RecursiveLock: NSRecursiveLock {
class RecursiveLock: NSRecursiveLock, @unchecked Sendable {
override init() {
_ = Resources.incrementTotal()
super.init()
Expand Down
2 changes: 1 addition & 1 deletion Rx.playground/Sources/SupportCode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public enum TestError: Swift.Error {
- parameter delay: time in seconds to wait before executing `closure`
- parameter closure: `Void` closure
*/
public func delay(_ delay: Double, closure: @escaping () -> Void) {
public func delay(_ delay: Double, closure: @escaping @Sendable () -> Void) {

DispatchQueue.main.asyncAfter(deadline: .now() + delay) {
closure()
Expand Down
10 changes: 5 additions & 5 deletions RxBlocking/BlockingObservable+Operators.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ extension BlockingObservable {
///
/// - parameter predicate: A function to test each source element for a condition.
/// - returns: Returns the only element of an sequence that satisfies the condition in the predicate, and reports an error if there is not exactly one element in the sequence.
public func single(_ predicate: @escaping (Element) throws -> Bool) throws -> Element {
public func single(_ predicate: @escaping @Sendable (Element) throws -> Bool) throws -> Element {
let results = self.materializeResult(max: 2, predicate: predicate)
let elements = try self.elementsOrThrow(results)

Expand Down Expand Up @@ -101,9 +101,9 @@ extension BlockingObservable {
}

extension BlockingObservable {
private func materializeResult(max: Int? = nil, predicate: @escaping (Element) throws -> Bool = { _ in true }) -> MaterializedSequenceResult<Element> {
var elements = [Element]()
var error: Swift.Error?
private func materializeResult(max: Int? = nil, predicate: @escaping @Sendable (Element) throws -> Bool = { _ in true }) -> MaterializedSequenceResult<Element> {
nonisolated(unsafe) var elements = [Element]()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do these need to be nonisolated?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because elements is accessed both in the function body and in the subscribe closure. The subscribe closure is not necessarily executed in sync or on the same thread as materializeResult

nonisolated(unsafe) var error: Swift.Error?

let lock = RunLoopLock(timeout: self.timeout)

Expand All @@ -114,7 +114,7 @@ extension BlockingObservable {
}

lock.dispatch {
let subscription = self.source.subscribe { event in
let subscription = self.source.subscribe { @Sendable event in
if d.isDisposed {
return
}
Expand Down
2 changes: 1 addition & 1 deletion RxBlocking/RunLoopLock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ final class RunLoopLock {
self.currentRunLoop = CFRunLoopGetCurrent()
}

func dispatch(_ action: @escaping () -> Void) {
func dispatch(_ action: @escaping @Sendable () -> Void) {
CFRunLoopPerformBlock(self.currentRunLoop, runLoopModeRaw) {
if CurrentThreadScheduler.isScheduleRequired {
_ = CurrentThreadScheduler.instance.schedule(()) { _ in
Expand Down
22 changes: 15 additions & 7 deletions RxCocoa/Common/ControlTarget.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import RxSwift
#endif

// This should be only used from `MainScheduler`
final class ControlTarget: RxTarget {
typealias Callback = (Control) -> Void
@MainActor
final class ControlTarget: RxTarget, @unchecked Sendable {
typealias Callback = @Sendable @MainActor (Control) -> Void

let selector: Selector = #selector(ControlTarget.eventHandler(_:))

Expand Down Expand Up @@ -72,16 +73,23 @@ final class ControlTarget: RxTarget {
callback(control)
}
}


@Sendable
override func dispose() {
super.dispose()
#if os(iOS) || os(tvOS) || os(visionOS)
self.control?.removeTarget(self, action: self.selector, for: self.controlEvents)
MainScheduler.assumeMainActor(execute: {
self.control?.removeTarget(self, action: self.selector, for: self.controlEvents)
})
#elseif os(macOS)
self.control?.target = nil
self.control?.action = nil
MainScheduler.assumeMainActor(execute: {
self.control?.target = nil
self.control?.action = nil
})
#endif
self.callback = nil
MainScheduler.assumeMainActor(execute: {
self.callback = nil
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion RxCocoa/Common/DelegateProxy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@
fileprivate let selector: Selector

init<P, D>(selector: Selector, delegateProxy _delegateProxy: DelegateProxy<P, D>) {
weak var weakDelegateProxy = _delegateProxy
nonisolated(unsafe) weak var weakDelegateProxy = _delegateProxy

let dispatcher = PublishSubject<[Any]>()
self.dispatcher = dispatcher
Expand Down
19 changes: 10 additions & 9 deletions RxCocoa/Common/DelegateProxyType.swift
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ extension DelegateProxyType {
/// When make 'Rx*DelegateProxy' subclass, call 'Rx*DelegateProxySubclass.register(for:_)' 1 time, or use it in DelegateProxyFactory
/// 'Rx*DelegateProxy' can have one subclass implementation per concrete ParentObject type.
/// Should call it from concrete DelegateProxy type, not generic.
public static func register<Parent>(make: @escaping (Parent) -> Self) {
public static func register<Parent>(make: @escaping @Sendable (Parent) -> Self) {
self.factory.extend(make: make)
}

Expand Down Expand Up @@ -214,7 +214,7 @@ extension DelegateProxyType {
/// - parameter onProxyForObject: Object that has `delegate` property.
/// - returns: Disposable object that can be used to clear forward delegate.
public static func installForwardDelegate(_ forwardDelegate: Delegate, retainDelegate: Bool, onProxyForObject object: ParentObject) -> Disposable {
weak var weakForwardDelegate: AnyObject? = forwardDelegate as AnyObject
nonisolated(unsafe) weak var weakForwardDelegate: AnyObject? = forwardDelegate as AnyObject
let proxy = self.proxy(for: object)

assert(proxy._forwardToDelegate() === nil, "This is a feature to warn you that there is already a delegate (or data source) set somewhere previously. The action you are trying to perform will clear that delegate (data source) and that means that some of your features that depend on that delegate (data source) being set will likely stop working.\n" +
Expand Down Expand Up @@ -317,7 +317,7 @@ extension DelegateProxyType where ParentObject: HasPrefetchDataSource, Self.Dele
import UIKit

extension ObservableType {
func subscribeProxyDataSource<DelegateProxy: DelegateProxyType>(ofObject object: DelegateProxy.ParentObject, dataSource: DelegateProxy.Delegate, retainDataSource: Bool, binding: @escaping (DelegateProxy, Event<Element>) -> Void)
func subscribeProxyDataSource<DelegateProxy: DelegateProxyType>(ofObject object: DelegateProxy.ParentObject, dataSource: DelegateProxy.Delegate, retainDataSource: Bool, binding: @escaping @Sendable (DelegateProxy, Event<Element>) -> Void)
-> Disposable
where DelegateProxy.ParentObject: UIView
, DelegateProxy.Delegate: AnyObject {
Expand Down Expand Up @@ -360,11 +360,12 @@ extension DelegateProxyType where ParentObject: HasPrefetchDataSource, Self.Dele

return Disposables.create { [weak object] in
subscription.dispose()

if object?.window != nil {
object?.layoutIfNeeded()
}

MainScheduler.tryExecuteInSync(execute: { [weak object] () in
guard let object else { return }
guard object.window != nil else { return }
object.layoutIfNeeded()
})

unregisterDelegate.dispose()
}
}
Expand Down Expand Up @@ -410,7 +411,7 @@ extension DelegateProxyType where ParentObject: HasPrefetchDataSource, Self.Dele
self._identifier = proxyType.identifier
}

fileprivate func extend<DelegateProxy: DelegateProxyType, ParentObject>(make: @escaping (ParentObject) -> DelegateProxy) {
fileprivate func extend<DelegateProxy: DelegateProxyType, ParentObject>(make: @escaping @Sendable (ParentObject) -> DelegateProxy) {
MainScheduler.ensureRunningOnMainThread()
precondition(self._identifier == DelegateProxy.identifier, "Delegate proxy has inconsistent identifier")
guard self._factories[ObjectIdentifier(ParentObject.self)] == nil else {
Expand Down
2 changes: 1 addition & 1 deletion RxCocoa/Common/Infallible+Bind.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ extension InfallibleType {
- parameter onNext: Action to invoke for each element in the observable sequence.
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func bind(onNext: @escaping (Element) -> Void) -> Disposable {
public func bind(onNext: @escaping @Sendable (Element) -> Void) -> Disposable {
self.subscribe(onNext: onNext)
}

Expand Down
4 changes: 2 additions & 2 deletions RxCocoa/Common/Observable+Bind.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ extension ObservableType {
*/
public func bind<Object: AnyObject>(
with object: Object,
onNext: @escaping (Object, Element) -> Void
onNext: @escaping @Sendable (Object, Element) -> Void
) -> Disposable {
self.subscribe(onNext: { [weak object] in
guard let object = object else { return }
Expand All @@ -94,7 +94,7 @@ extension ObservableType {
- parameter onNext: Action to invoke for each element in the observable sequence.
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func bind(onNext: @escaping (Element) -> Void) -> Disposable {
public func bind(onNext: @escaping @Sendable (Element) -> Void) -> Disposable {
self.subscribe(onNext: onNext,
onError: { error in
rxFatalErrorInDebug("Binding error: \(error)")
Expand Down
3 changes: 2 additions & 1 deletion RxCocoa/Common/RxTarget.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import Foundation
import RxSwift

class RxTarget : NSObject
, Disposable {
, Disposable
, @unchecked Sendable {

private var retainSelf: RxTarget?

Expand Down
4 changes: 2 additions & 2 deletions RxCocoa/Foundation/NSObject+Rx+KVORepresentable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ extension Reactive where Base: NSObject {
*/
public func observe<Element: KVORepresentable>(_ type: Element.Type, _ keyPath: String, options: KeyValueObservingOptions = [.new, .initial], retainSelf: Bool = true) -> Observable<Element?> {
return self.observe(Element.KVOType.self, keyPath, options: options, retainSelf: retainSelf)
.map(Element.init)
.map({ Element(KVOValue: $0) })
}
}

Expand All @@ -52,7 +52,7 @@ extension Reactive where Base: NSObject {
*/
public func observeWeakly<Element: KVORepresentable>(_ type: Element.Type, _ keyPath: String, options: KeyValueObservingOptions = [.new, .initial]) -> Observable<Element?> {
return self.observeWeakly(Element.KVOType.self, keyPath, options: options)
.map(Element.init)
.map({ Element(KVOValue: $0) })
}
}
#endif
Expand Down
4 changes: 2 additions & 2 deletions RxCocoa/Foundation/NSObject+Rx+RawRepresentable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ extension Reactive where Base: NSObject {
*/
public func observe<Element: RawRepresentable>(_ type: Element.Type, _ keyPath: String, options: KeyValueObservingOptions = [.new, .initial], retainSelf: Bool = true) -> Observable<Element?> where Element.RawValue: KVORepresentable {
return self.observe(Element.RawValue.KVOType.self, keyPath, options: options, retainSelf: retainSelf)
.map(Element.init)
.map({ Element(KVOValue: $0) })
}
}

Expand All @@ -44,7 +44,7 @@ extension Reactive where Base: NSObject {
*/
public func observeWeakly<Element: RawRepresentable>(_ type: Element.Type, _ keyPath: String, options: KeyValueObservingOptions = [.new, .initial]) -> Observable<Element?> where Element.RawValue: KVORepresentable {
return self.observeWeakly(Element.RawValue.KVOType.self, keyPath, options: options)
.map(Element.init)
.map({ Element(KVOValue: $0) })
}
}
#endif
Expand Down
11 changes: 7 additions & 4 deletions RxCocoa/Foundation/NSObject+Rx.swift
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ private protocol KVOObservableProtocol {

private final class KVOObserver
: _RXKVOObserver
, Disposable {
, Disposable
, @unchecked Sendable {
typealias Callback = (Any?) -> Void

var retainSelf: KVOObserver?
Expand All @@ -373,7 +374,8 @@ private final class KVOObserver
super.init(target: parent.target, retainTarget: parent.retainTarget, keyPath: parent.keyPath, options: parent.options.nsOptions, callback: callback)
self.retainSelf = self
}


@Sendable
override func dispose() {
super.dispose()
self.retainSelf = nil
Expand All @@ -388,7 +390,8 @@ private final class KVOObserver

private final class KVOObservable<Element>
: ObservableType
, KVOObservableProtocol {
, KVOObservableProtocol
, @unchecked Sendable {
typealias Element = Element?

unowned var target: AnyObject
Expand Down Expand Up @@ -483,7 +486,7 @@ private extension KeyValueObservingOptions {
options: KeyValueObservingOptions
) -> Observable<AnyObject?> {

weak var weakTarget: AnyObject? = target
nonisolated(unsafe) weak var weakTarget: AnyObject? = target

let propertyName = keyPathSections[0]
let remainingPaths = Array(keyPathSections[1..<keyPathSections.count])
Expand Down
2 changes: 1 addition & 1 deletion RxCocoa/Foundation/NotificationCenter+Rx.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ extension Reactive where Base: NotificationCenter {
*/
public func notification(_ name: Notification.Name?, object: AnyObject? = nil) -> Observable<Notification> {
return Observable.create { [weak object] observer in
let nsObserver = self.base.addObserver(forName: name, object: object, queue: nil) { notification in
nonisolated(unsafe) let nsObserver = self.base.addObserver(forName: name, object: object, queue: nil) { notification in
observer.on(.next(notification))
}

Expand Down
2 changes: 1 addition & 1 deletion RxCocoa/Foundation/URLSession+Rx.swift
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ extension Reactive where Base: URLSession {

task.resume()

return Disposables.create(with: task.cancel)
return Disposables.create(with: { task.cancel() })
}
}

Expand Down
16 changes: 8 additions & 8 deletions RxCocoa/Traits/Driver/Driver+Subscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt
*/
public func drive<Object: AnyObject>(
with object: Object,
onNext: ((Object, Element) -> Void)? = nil,
onCompleted: ((Object) -> Void)? = nil,
onDisposed: ((Object) -> Void)? = nil
onNext: (@Sendable @MainActor (Object, Element) -> Void)? = nil,
onCompleted: (@Sendable @MainActor (Object) -> Void)? = nil,
onDisposed: (@Sendable (Object) -> Void)? = nil
) -> Disposable {
MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
return self.asObservable().subscribe(with: object, onNext: onNext, onCompleted: onCompleted, onDisposed: onDisposed)
return self.asObservable().subscribe(with: object, onNext: onNext.flatMap({ MainScheduler.assumeMainActor($0) }), onCompleted: onCompleted.flatMap({ MainScheduler.assumeMainActor($0) }), onDisposed: onDisposed)
}

/**
Expand All @@ -179,12 +179,12 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func drive(
onNext: ((Element) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
onNext: (@Sendable @MainActor (Element) -> Void)? = nil,
onCompleted: (@Sendable @MainActor () -> Void)? = nil,
onDisposed: (@Sendable () -> Void)? = nil
) -> Disposable {
MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
return self.asObservable().subscribe(onNext: onNext, onCompleted: onCompleted, onDisposed: onDisposed)
return self.asObservable().subscribe(onNext: onNext.flatMap({ MainScheduler.assumeMainActor($0) }), onCompleted: onCompleted.flatMap({ MainScheduler.assumeMainActor($0) }), onDisposed: onDisposed)
}

/**
Expand Down
4 changes: 2 additions & 2 deletions RxCocoa/Traits/Driver/ObservableConvertibleType+Driver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ extension ObservableConvertibleType {
- parameter onErrorRecover: Calculates driver that continues to drive the sequence in case of error.
- returns: Driver trait.
*/
public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver<Element>) -> Driver<Element> {
public func asDriver(onErrorRecover: @escaping @Sendable @MainActor (_ error: Swift.Error) -> Driver<Element>) -> Driver<Element> {
let source = self
.asObservable()
.observe(on:DriverSharingStrategy.scheduler)
.catch { error in
onErrorRecover(error).asObservable()
MainScheduler.assumeMainActor(onErrorRecover)(error).asObservable()
}
return Driver(source)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ extension ObservableConvertibleType {
- parameter onErrorRecover: Calculates driver that continues to drive the sequence in case of error.
- returns: Driving observable sequence.
*/
public func asSharedSequence<S>(sharingStrategy: S.Type = S.self, onErrorRecover: @escaping (_ error: Swift.Error) -> SharedSequence<S, Element>) -> SharedSequence<S, Element> {
public func asSharedSequence<S>(sharingStrategy: S.Type = S.self, onErrorRecover: @escaping @Sendable (_ error: Swift.Error) -> SharedSequence<S, Element>) -> SharedSequence<S, Element> {
let source = self
.asObservable()
.observe(on:S.scheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public enum SharingScheduler {

**This shouldn't be used in normal release builds.**
*/
static public func mock(makeScheduler: @escaping () -> SchedulerType, action: () throws -> Void) rethrows {
static public func mock(makeScheduler: @escaping @Sendable () -> SchedulerType, action: () throws -> Void) rethrows {
let originalMake = make
make = makeScheduler
defer {
Expand Down
Loading