diff --git a/Steepfile b/Steepfile new file mode 100644 index 00000000..5bdbde82 --- /dev/null +++ b/Steepfile @@ -0,0 +1,6 @@ +D = Steep::Diagnostic + +target :lib do + signature "sig" + check "lib" +end diff --git a/gems.rb b/gems.rb index cdd8b072..94696ba3 100644 --- a/gems.rb +++ b/gems.rb @@ -34,6 +34,8 @@ gem "rubocop" gem "rubocop-socketry" + gem "steep", git: "https://github.com/soutaro/steep" + gem "sus-fixtures-async" gem "sus-fixtures-console" gem "sus-fixtures-time" diff --git a/lib/async/barrier.rb b/lib/async/barrier.rb index b3b6f677..b4a276ea 100644 --- a/lib/async/barrier.rb +++ b/lib/async/barrier.rb @@ -13,7 +13,7 @@ module Async # @public Since *Async v1*. class Barrier # Initialize the barrier. - # @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks. + # @parameter parent [_Asyncable?] The parent for holding any children tasks. # @public Since *Async v1*. def initialize(parent: nil) @tasks = List.new @@ -32,16 +32,17 @@ def initialize(task) private_constant :TaskNode - # Number of tasks being held by the barrier. + # @returns [Integer] Number of tasks being held by the barrier. def size @tasks.size end - # All tasks which have been invoked into the barrier. + # @attribute [Array(Task)] All tasks which have been invoked into the barrier. attr :tasks # Execute a child task and add it to the barrier. # @asynchronous Executes the given block concurrently. + # @rbs [T] (*untyped, parent: _Asyncable, **untyped) {(Task, *untyped) -> T} -> Task[T] def async(*arguments, parent: (@parent or Task.current), **options, &block) waiting = nil @@ -54,8 +55,7 @@ def async(*arguments, parent: (@parent or Task.current), **options, &block) end end - # Whether there are any tasks being held by the barrier. - # @returns [Boolean] + # @returns [Boolean] Whether there are any tasks being held by the barrier. def empty? @tasks.empty? end @@ -63,6 +63,7 @@ def empty? # Wait for all tasks to complete by invoking {Task#wait} on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier. # # @yields {|task| ...} If a block is given, the unwaited task is yielded. You must invoke {Task#wait} yourself. In addition, you may `break` if you have captured enough results. + # @parameter task [Task] The task which has completed. # # @asynchronous Will wait for tasks to finish executing. def wait @@ -88,6 +89,7 @@ def wait # Stop all tasks held by the barrier. # @asynchronous May wait for tasks to finish executing. + # @rbs () -> void def stop @tasks.each do |waiting| waiting.task.stop diff --git a/lib/async/condition.rb b/lib/async/condition.rb index 7f2b5d98..2b30fb2b 100644 --- a/lib/async/condition.rb +++ b/lib/async/condition.rb @@ -53,7 +53,7 @@ def waiting? end # Signal to a given task that it should resume operations. - # @parameter value [Object | Nil] The value to return to the waiting fibers. + # @parameter value [::Object | Nil] The value to return to the waiting fibers. def signal(value = nil) return if @waiting.empty? diff --git a/lib/async/idler.rb b/lib/async/idler.rb index c221969c..785cf898 100644 --- a/lib/async/idler.rb +++ b/lib/async/idler.rb @@ -12,7 +12,7 @@ class Idler # # @parameter maximum_load [Numeric] The maximum load before we start shedding work. # @parameter backoff [Numeric] The initial backoff time, used for delaying work. - # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations. + # @parameter parent [_Asyncable | Nil] The parent task to use for async operations. def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil) @maximum_load = maximum_load @backoff = backoff @@ -24,7 +24,7 @@ def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil) # @asynchronous Executes the given block concurrently. # # @parameter arguments [Array] The arguments to pass to the block. - # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations. + # @parameter parent [_Asyncable] The parent task to use for async operations. # @parameter options [Hash] The options to pass to the task. # @yields {|task| ...} When the system is idle, the block will be executed in a new task. def async(*arguments, parent: (@parent or Task.current), **options, &block) diff --git a/lib/async/list.rb b/lib/async/list.rb index 5bc55a1f..9cd4f906 100644 --- a/lib/async/list.rb +++ b/lib/async/list.rb @@ -83,6 +83,7 @@ def prepend(node) # Add the node, yield, and the remove the node. # @yields {|node| ...} Yields the node. + # @parameter node [Node] The node to add to the list. # @returns [Object] Returns the result of the block. def stack(node, &block) append(node) diff --git a/lib/async/node.rb b/lib/async/node.rb index dec881e1..1dbfeb1c 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -171,7 +171,7 @@ def description # Provides a backtrace for nodes that have an active execution context. # - # @returns [Array(Thread::Backtrace::Locations) | Nil] The backtrace of the node, if available. + # @returns [Array(::Thread::Backtrace::Locations) | Nil] The backtrace of the node, if available. def backtrace(*arguments) nil end diff --git a/lib/async/notification.rb b/lib/async/notification.rb index 79765e58..6bb1e396 100644 --- a/lib/async/notification.rb +++ b/lib/async/notification.rb @@ -11,7 +11,7 @@ module Async class Notification < Condition # Signal to a given task that it should resume operations. # - # @returns [Boolean] if a task was signalled. + # @returns [bool] If a task was signalled. def signal(value = nil, task: Task.current) return false if @waiting.empty? @@ -21,14 +21,19 @@ def signal(value = nil, task: Task.current) end Signal = Struct.new(:waiting, :value) do + # @returns [bool] Returns true if the signal is still alive. def alive? true end + # Transfer the value to all waiting fibers. + # @returns [self] def transfer waiting.each do |fiber| fiber.transfer(value) if fiber.alive? end + + return self end end diff --git a/lib/async/queue.rb b/lib/async/queue.rb index 503ae8d0..1526bf2d 100644 --- a/lib/async/queue.rb +++ b/lib/async/queue.rb @@ -23,7 +23,7 @@ class ClosedError < RuntimeError # Create a new queue. # - # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations. + # @parameter parent [_Asyncable | Nil] The parent task to use for async operations. # @parameter available [Notification] The notification to use for signaling when items are available. def initialize(parent: nil, available: Notification.new) @items = [] @@ -104,7 +104,7 @@ def pop # @asynchronous Executes the given block concurrently for each item. # # @parameter arguments [Array] The arguments to pass to the block. - # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations. + # @parameter parent [_Asyncable | Nil] The parent task to use for async operations. # @parameter options [Hash] The options to pass to the task. # @yields {|task| ...} When the system is idle, the block will be executed in a new task. def async(parent: (@parent or Task.current), **options, &block) diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 568a5ff2..fb8f0b7e 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -72,7 +72,7 @@ def blocking_operation_wait(work) # # @public Since *Async v1*. # @parameter parent [Node | Nil] The parent node to use for task hierarchy. - # @parameter selector [IO::Event::Selector] The selector to use for event handling. + # @parameter selector [::IO::Event::Selector] The selector to use for event handling. def initialize(parent = nil, selector: nil, profiler: Profiler&.default, worker_pool: WORKER_POOL) super(parent) @@ -197,7 +197,7 @@ def yield end # Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor. - # @parameter fiber [Fiber | Object] The object to be resumed on the next iteration of the run-loop. + # @parameter fiber [Any(Fiber, Object)] The object to be resumed on the next iteration of the run-loop. def push(fiber) @selector.push(fiber) end diff --git a/lib/async/task.rb b/lib/async/task.rb index e366f972..a11cd6b6 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -52,6 +52,7 @@ def initialize(message = "execution expired") end # @public Since *Async v1*. + # @rbs generic ResultType class Task < Node # Raised when a child task is created within a task that has finished execution. class FinishedError < RuntimeError @@ -224,6 +225,7 @@ def run(*arguments) # @yields {|task| ...} in the context of the new task. # @raises [FinishedError] If the task has already finished. # @returns [Task] The child task. + # @rbs [T] (*untyped, **untyped) { (Task[T], *untyped, **untyped) -> T } -> Task[T] def async(*arguments, **options, &block) raise FinishedError if self.finished? @@ -247,6 +249,7 @@ def async(*arguments, **options, &block) # # @raises [RuntimeError] If the task's fiber is the current fiber. # @returns [Object] The final expression/result of the task's block. + # @rbs () -> ResultType def wait raise "Cannot wait on own fiber!" if Fiber.current.equal?(@fiber) @@ -367,7 +370,7 @@ def self.current end # Check if there is a task defined for the current fiber. - # @returns [Interface(:async) | Nil] + # @returns [_Asyncable | Nil] def self.current? Fiber.current.async_task end diff --git a/lib/async/variable.rb b/lib/async/variable.rb index 56712954..f2734e44 100644 --- a/lib/async/variable.rb +++ b/lib/async/variable.rb @@ -7,6 +7,7 @@ module Async # A synchronization primitive that allows one task to wait for another task to resolve a value. + # @rbs generic T class Variable # Create a new variable. # @@ -22,13 +23,14 @@ def initialize(condition = Condition.new) # # @parameter value [Object] The value to resolve. def resolve(value = true) - @value = value - condition = @condition - @condition = nil - - self.freeze - - condition.signal(value) + if condition = @condition + @value = value + @condition = nil + + self.freeze + + condition.signal(value) + end end # Alias for {#resolve}. @@ -45,7 +47,7 @@ def resolved? # Wait for the value to be resolved. # - # @returns [Object] The resolved value. + # @returns [T?] The resolved value. def wait @condition&.wait return @value diff --git a/lib/async/waiter.rb b/lib/async/waiter.rb index 94105d36..62480431 100644 --- a/lib/async/waiter.rb +++ b/lib/async/waiter.rb @@ -10,8 +10,8 @@ module Async class Waiter # Create a waiter instance. # - # @parameter parent [Interface(:async) | Nil] The parent task to use for asynchronous operations. - # @parameter finished [Async::Condition] The condition to signal when a task completes. + # @parameter parent [_Asyncable | Nil] The parent task to use for asynchronous operations. + # @parameter finished [::Async::Condition] The condition to signal when a task completes. def initialize(parent: nil, finished: Async::Condition.new) warn("`Async::Waiter` is deprecated, use `Async::Barrier` instead.", uplevel: 1, category: :deprecated) if $VERBOSE @@ -34,7 +34,7 @@ def async(parent: (@parent or Task.current), **options, &block) # Wait for the first `count` tasks to complete. # @parameter count [Integer | Nil] The number of tasks to wait for. - # @returns [Array(Async::Task)] If an integer is given, the tasks which have completed. + # @returns [Array(::Async::Task)] If an integer is given, the tasks which have completed. # @returns [Async::Task] Otherwise, the first task to complete. def first(count = nil) minimum = count || 1 diff --git a/lib/kernel/async.rb b/lib/kernel/async.rb index 46c41c96..12a8372a 100644 --- a/lib/kernel/async.rb +++ b/lib/kernel/async.rb @@ -21,6 +21,7 @@ module Kernel # # @public Since *Async v1*. # @asynchronous May block until given block completes executing. + # @rbs [ResultType] (*untyped, **untyped) { (Task) -> ResultType } -> Task[ResultType] def Async(...) if current = ::Async::Task.current? return current.async(...) diff --git a/lib/kernel/sync.rb b/lib/kernel/sync.rb index ee38fe44..dcebc824 100644 --- a/lib/kernel/sync.rb +++ b/lib/kernel/sync.rb @@ -16,6 +16,7 @@ module Kernel # # @public Since *Async v1*. # @asynchronous Will block until given block completes executing. + # @rbs [ResultType] (**untyped) { (Task) -> ResultType } -> ResultType def Sync(annotation: nil, &block) if task = ::Async::Task.current? if annotation diff --git a/sig/async.rbs b/sig/async.rbs new file mode 100644 index 00000000..4e942145 --- /dev/null +++ b/sig/async.rbs @@ -0,0 +1,758 @@ +# Asynchronous programming framework. +module Async + # A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}. + class Barrier + attr_reader tasks(@tasks): Array[Task] + + @tasks: Array[Task] + + # Initialize the barrier. + public def initialize: (?parent: _Asyncable?) -> void + + public def size: () -> Integer + + # Execute a child task and add it to the barrier. + public def async: [T] (*untyped, parent: _Asyncable, **untyped) { (Task, *untyped) -> T } -> Task[T] + + public def empty?: () -> bool + + # Wait for all tasks to complete by invoking {Task#wait} on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier. + public def wait: () { (Task task) -> void } -> untyped + + # Stop all tasks held by the barrier. + public def stop: () -> void + end + + # A convenient wrapper around the internal monotonic clock. + class Clock + # Get the current elapsed monotonic time. + public def self.now: () -> untyped + + # Measure the execution of a block of code. + public def self.measure: () { () -> void } -> Numeric + + # Start measuring elapsed time from now. + public def self.start: () -> Clock + + # Create a new clock with the initial total time. + public def initialize: (?Numeric total) -> void + + # Start measuring a duration. + public def start!: () -> untyped + + # Stop measuring a duration and append the duration to the current total. + public def stop!: () -> untyped + + # The total elapsed time including any current duration. + public def total: () -> untyped + + # Reset the total elapsed time. If the clock is currently running, reset the start time to now. + public def reset!: () -> untyped + end + + # A synchronization primitive, which allows fibers to wait until a particular condition is (edge) triggered. + class Condition + # Create a new condition. + public def initialize: () -> void + + # Queue up the current fiber and wait on yielding the task. + public def wait: () -> Object + + public def empty?: () -> bool + + public def waiting?: () -> bool + + # Signal to a given task that it should resume operations. + public def signal: (?::Object? value) -> untyped + + def exchange: () -> untyped + end + + # Shims for the console gem, redirecting warnings and above to `Kernel#warn`. + # + # If you require this file, the `async` library will not depend on the `console` gem. + # + # That includes any gems that sit within the `Async` namespace. + # + # This is an experimental feature. + module Console + # Log a message at the debug level. The shim is silent. + public def self.debug: () -> untyped + + # Log a message at the info level. The shim is silent. + public def self.info: () -> untyped + + # Log a message at the warn level. The shim redirects to `Kernel#warn`. + public def self.warn: (*untyped arguments, ?exception: untyped, **untyped options) -> untyped + + # Log a message at the error level. The shim redirects to `Kernel#warn`. + public def self.error: () -> untyped + + # Log a message at the fatal level. The shim redirects to `Kernel#warn`. + public def self.fatal: () -> untyped + end + + # A load balancing mechanism that can be used process work when the system is idle. + class Idler + # Create a new idler. + public def initialize: (?Numeric maximum_load, ?backoff: Numeric, ?parent: _Asyncable?) -> void + + # Wait until the system is idle, then execute the given block in a new task. + public def async: (*Array arguments, ?parent: _Asyncable, **Hash options) { () -> void } -> untyped + + # Wait until the system is idle, according to the maximum load specified. + # + # If the scheduler is overloaded, this method will sleep for an exponentially increasing amount of time. + public def wait: () -> untyped + end + + class LimitedQueue < Queue + attr_reader limit(@limit): Integer + + @limit: Integer + + public def self.new: () -> untyped + + # Create a new limited queue. + public def initialize: (?Integer limit, ?full: Notification, **untyped options) -> void + + # Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception. + # Also signals all tasks waiting for the queue to be full. + public def close: () -> untyped + + public def limited?: () -> bool + + # Add an item to the queue. + # + # If the queue is full, this method will block until there is space available. + public def push: (Object item) -> self + + # Add multiple items to the queue. + # + # If the queue is full, this method will block until there is space available. + public def enqueue: (*Array items) -> untyped + + # Remove and return the next item from the queue. + # + # If the queue is empty, this method will block until an item is available. + public def dequeue: () -> Object + end + + # A general doublely linked list. This is used internally by {Async::Barrier} and {Async::Condition} to manage child tasks. + class List + attr_reader head(@head): Node? + + attr_reader tail(@tail): Node? + + attr_reader size(@size): Integer + + @head: Node? + + @tail: Node? + + @size: Integer + + # Initialize a new, empty, list. + public def initialize: () -> void + + public def to_s: () -> String + + # Fast, safe, unbounded accumulation of children. + public def to_a: () -> untyped + + # A callback that is invoked when an item is added to the list. + public def added: (untyped node) -> self + + # Append a node to the end of the list. + public def append: (untyped node) -> self + + # Prepend a node to the start of the list. + public def prepend: (untyped node) -> self + + # Add the node, yield, and the remove the node. + public def stack: (untyped node) { (Node node) -> void } -> Object + + # A callback that is invoked when an item is removed from the list. + public def removed: (untyped node) -> self + + # Remove the node if it is in a list. + # + # You should be careful to only remove nodes that are part of this list. + public def remove?: (untyped node) -> Node + + # Remove the node. If it was already removed, this will raise an error. + # + # You should be careful to only remove nodes that are part of this list. + public def remove: (untyped node) -> Node + + private def remove!: (untyped node) -> self + + public def empty?: () -> bool + + # Iterate over each node in the linked list. It is generally safe to remove the current node, any previous node or any future node during iteration. + public def each: () { () -> void } -> List + + # Determine whether the given node is included in the list. + public def include?: (Node needle) -> bool + + public def first: () -> Node + + public def last: () -> Node + + # Shift the first node off the list, if it is not empty. + public def shift: () -> untyped + + # A linked list Node. + class Node + public def to_s: () -> String + end + end + + # A list of children tasks. + class Children < List + # Create an empty list of children tasks. + public def initialize: () -> void + + # Some children may be marked as transient. Transient children do not prevent the parent from finishing. + public def transients?: () -> bool + + # Whether all children are considered finished. Ignores transient children. + public def finished?: () -> bool + + # Whether the children is empty, preserved for compatibility. + public def nil?: () -> bool + + # Adjust the number of transient children, assuming it has changed. + # + # Despite being public, this is not intended to be called directly. It is used internally by {Node#transient=}. + public def adjust_transient_count: (bool transient) -> untyped + + private def added: (untyped node) -> self + + private def removed: (untyped node) -> self + end + + # A node in a tree, used for implementing the task hierarchy. + class Node + attr_reader parent(@parent): Node + + attr_reader children(@children): Children? + + attr_reader annotation(@annotation): String? + + @parent: Node + + @children: Children? + + @annotation: String? + + # Create a new node in the tree. + public def initialize: (?Node? parent, ?annotation: untyped, ?transient: untyped) -> void + + public def root: () -> Node + + # Whether this node has any children. + public def children?: () -> bool + + # Represents whether a node is transient. Transient nodes are not considered + # when determining if a node is finished. This is useful for tasks which are + # internal to an object rather than explicit user concurrency. For example, + # a child task which is pruning a connection pool is transient, because it + # is not directly related to the parent task, and should not prevent the + # parent task from finishing. + public def transient?: () -> bool + + # Change the transient state of the node. + # + # A transient node is not considered when determining if a node is finished, and propagates up if the parent is consumed. + public def transient=: (bool value) -> untyped + + # Annotate the node with a description. + public def annotate: (String annotation) -> untyped + + # A description of the node, including the annotation and object name. + public def description: () -> String + + # Provides a backtrace for nodes that have an active execution context. + public def backtrace: (*untyped arguments) -> Array[::Thread::Backtrace::Locations]? + + public def to_s: () -> String + + # Change the parent of this node. + public def parent=: (Node? parent) -> Node + + def set_parent: (untyped parent) -> untyped + + def add_child: (untyped child) -> self + + def remove_child: (untyped child) -> self + + # Whether the node can be consumed (deleted) safely. By default, checks if the children set is empty. + public def finished?: () -> bool + + # If the node has a parent, and is {finished?}, then remove this node from + # the parent. + public def consume: () -> untyped + + # Traverse the task tree. + public def traverse: () { () -> void } -> Enumerator + + def traverse_recurse: (?untyped level) -> untyped + + # Immediately terminate all children tasks, including transient tasks. Internally invokes `stop(false)` on all children. This should be considered a last ditch effort and is used when closing the scheduler. + public def terminate: () -> untyped + + # Attempt to stop the current node immediately, including all non-transient children. Invokes {#stop_children} to stop all children. + public def stop: (?bool later) -> untyped + + private def stop_children: (?untyped later) -> untyped + + # Whether the node has been stopped. + public def stopped?: () -> bool + + # Print the hierarchy of the task tree from the given node. + public def print_hierarchy: (?IO out, ?backtrace: bool) -> untyped + + private def print_backtrace: (untyped out, untyped indent, untyped node) -> untyped + end + + # A synchronization primitive, which allows fibers to wait until a notification is received. Does not block the task which signals the notification. Waiting tasks are resumed on next iteration of the reactor. + class Notification < Condition + # Signal to a given task that it should resume operations. + public def signal: (?untyped value, ?task: untyped) -> bool + end + + # A queue which allows items to be processed in order. + # + # It has a compatible interface with {Notification} and {Condition}, except that it's multi-value. + class Queue + attr_reader items(@items): Array + + @items: Array + + # Create a new queue. + public def initialize: (?parent: _Asyncable?, ?available: Notification) -> void + + # Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception. + public def close: () -> untyped + + public def size: () -> Integer + + public def empty?: () -> bool + + # Add an item to the queue. + public def push: (untyped item) -> self + + # Compatibility with {::Queue#push}. + public def <<: (untyped item) -> self + + # Add multiple items to the queue. + public def enqueue: (*untyped items) -> untyped + + # Remove and return the next item from the queue. + public def dequeue: () -> untyped + + # Compatibility with {::Queue#pop}. + public def pop: () -> untyped + + # Process each item in the queue. + public def async: (?parent: _Asyncable?, **Hash options) { () -> void } -> untyped + + # Enumerate each item in the queue. + public def each: () -> untyped + + # Signal the queue with a value, the same as {#enqueue}. + public def signal: (?untyped value) -> untyped + + # Wait for an item to be available, the same as {#dequeue}. + public def wait: () -> untyped + + # An error raised when trying to enqueue items to a closed queue. + class ClosedError < RuntimeError + end + end + + # A wrapper around the the scheduler which binds it to the current thread automatically. + class Reactor < Scheduler + public def self.run: () -> untyped + + # Initialize the reactor and assign it to the current Fiber scheduler. + public def initialize: () -> void + + # Close the reactor and remove it from the current Fiber scheduler. + public def scheduler_close: () -> untyped + end + + # Handles scheduling of fibers. Implements the fiber scheduler interface. + class Scheduler < Node + # Whether the fiber scheduler is supported. + public def self.supported?: () -> bool + + # Create a new scheduler. + public def initialize: (?Node? parent, ?selector: ::IO::Event::Selector, ?profiler: untyped, ?worker_pool: untyped) -> void + + # Compute the scheduler load according to the busy and idle times that are updated by the run loop. + public def load: () -> Float + + # Invoked when the fiber scheduler is being closed. + # + # Executes the run loop until all tasks are finished, then closes the scheduler. + public def scheduler_close: (?untyped error) -> untyped + + # Terminate all child tasks. + public def terminate: () -> untyped + + # Terminate all child tasks and close the scheduler. + public def close: () -> untyped + + public def closed?: () -> bool + + public def to_s: () -> String + + # Interrupt the event loop and cause it to exit. + public def interrupt: () -> untyped + + # Transfer from the calling fiber to the event loop. + public def transfer: () -> untyped + + # Yield the current fiber and resume it on the next iteration of the event loop. + public def yield: () -> untyped + + # Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor. + public def push: (Any[Fiber, Object] fiber) -> self + + # Raise an exception on a specified fiber with the given arguments. + # + # This internally schedules the current fiber to be ready, before raising the exception, so that it will later resume execution. + public def raise: () -> untyped + + # Resume execution of the specified fiber. + public def resume: (Fiber fiber, *Array arguments) -> untyped + + # Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue. + public def block: (Object blocker, Float? timeout) -> untyped + + # Unblock a fiber that was previously blocked. + public def unblock: (Object blocker, Fiber fiber) -> untyped + + # Sleep for the specified duration. + public def kernel_sleep: (?Numeric? duration) -> untyped + + # Resolve the address of the given hostname. + public def address_resolve: (String hostname) -> self + + # Wait for the specified IO to become ready for the specified events. + public def io_wait: (IO io, Integer events, ?Float? timeout) -> untyped + + # Read from the specified IO into the buffer. + public def io_read: (IO io, IO::Buffer buffer, Integer length, ?Integer offset) -> untyped + + # Write the specified buffer to the IO. + public def io_write: (IO io, IO::Buffer buffer, Integer length, ?Integer offset) -> untyped + + # Raise an exception on the specified fiber, waking up the event loop if necessary. + public def fiber_interrupt: (untyped fiber, untyped exception) -> untyped + + # Wait for the specified process ID to exit. + public def process_wait: (Integer pid, Integer flags) -> Process::Status + + # Wait for the specified IOs to become ready for the specified events. + public def io_select: () -> untyped + + private def run_once!: (?untyped timeout) -> untyped + + # Run one iteration of the event loop. + public def run_once: (?Float? timeout) -> bool + + private def interrupted?: () -> bool + + # Stop all children, including transient children. + public def stop: () -> untyped + + private def run_loop: () -> untyped + + # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided. + # + # Forwards all parameters to {#async} if a block is given. + public def run: () { () -> void } -> Task + + # Start an asynchronous task within the specified reactor. The task will be executed until the first blocking call, at which point it will yield and and this method will return. + public def async: (*untyped arguments, **untyped options) { () -> void } -> Task + + # Create a new fiber and return it without starting execution. + public def fiber: () -> Fiber + + # Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception. + public def with_timeout: (Numeric duration, ?Class exception, ?String message) { () -> void } -> untyped + + # Invoke the block, but after the specified timeout, raise the specified exception with the given message. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception. + public def timeout_after: (Numeric duration, Class exception, String message) { () -> void } -> untyped + + # Raised when an operation is attempted on a closed scheduler. + class ClosedError < RuntimeError + # Create a new error. + public def initialize: (?String message) -> void + end + + # Used to defer stopping the current task until later. + class FiberInterrupt + # Create a new stop later operation. + public def initialize: (untyped fiber, untyped exception) -> void + + public def alive?: () -> bool + + # Transfer control to the operation - this will stop the task. + public def transfer: () -> untyped + end + end + + # A synchronization primitive, which limits access to a given resource. + class Semaphore + public def initialize: (?Integer limit, ?parent: Task | Semaphore?) -> void + + # Allow setting the limit. This is useful for cases where the semaphore is used to limit the number of concurrent tasks, but the number of tasks is not known in advance or needs to be modified. + # + # On increasing the limit, some tasks may be immediately resumed. On decreasing the limit, some tasks may execute until the count is < than the limit. + public def limit=: (Integer limit) -> untyped + + # Is the semaphore currently acquired? + public def empty?: () -> bool + + # Whether trying to acquire this semaphore would block. + public def blocking?: () -> bool + + # Run an async task. Will wait until the semaphore is ready until spawning and running the task. + public def async: (*untyped arguments, ?parent: untyped, **untyped options) -> untyped + + # Acquire the semaphore, block if we are at the limit. + # If no block is provided, you must call release manually. + public def acquire: () { () -> void } -> untyped + + # Release the semaphore. Must match up with a corresponding call to `acquire`. Will release waiting fibers in FIFO order. + public def release: () -> untyped + + # Wait until the semaphore becomes available. + private def wait: () -> untyped + end + + # Raised when a task is explicitly stopped. + class Stop < Exception + # Used to defer stopping the current task until later. + class Later + # Create a new stop later operation. + public def initialize: (Task task) -> void + + public def alive?: () -> bool + + # Transfer control to the operation - this will stop the task. + public def transfer: () -> untyped + end + end + + # Raised if a timeout occurs on a specific Fiber. Handled gracefully by `Task`. + class TimeoutError < StandardError + # Create a new timeout error. + public def initialize: (?String message) -> void + end + + class Task[ResultType] < Node + attr_reader fiber(@fiber): Fiber + + attr_reader status(@status): Symbol + + @fiber: Fiber + + @status: Symbol + + public def self.yield: () -> untyped + + # Yield back to the reactor and allow other fibers to execute. + public def yield: () -> untyped + + # Run the given block of code in a task, asynchronously, in the given scheduler. + public def self.run: (untyped scheduler, *untyped arguments, **untyped options) -> untyped + + # Begin the execution of the task. + public def run: (*untyped arguments) -> untyped + + # Create a new task. + public def initialize: (?Task parent, ?finished: untyped, **untyped options) -> void + + public def reactor: () -> Scheduler + + public def backtrace: (*untyped arguments) -> Array[Thread::Backtrace::Location]? + + # Annotate the task with a description. + # + # This will internally try to annotate the fiber if it is running, otherwise it will annotate the task itself. + public def annotate: (String annotation) -> untyped + + public def annotation: () -> Object + + public def to_s: () -> String + + public def sleep: (?untyped duration) -> untyped + + # Execute the given block of code, raising the specified exception if it exceeds the given duration during a non-blocking operation. + public def with_timeout: (untyped duration, ?untyped exception, ?untyped message) -> untyped + + public def alive?: () -> bool + + # Whether we can remove this node from the reactor graph. + public def finished?: () -> bool + + public def running?: () -> bool + + public def failed?: () -> bool + + public def stopped?: () -> bool + + public def completed?: () -> bool + + # Alias for {#completed?}. + public def complete?: () -> bool + + # Run an asynchronous task as a child of the current task. + public def async: [T] (*untyped, **untyped) { (Task[T], *untyped, **untyped) -> T } -> Task[T] + + # Retrieve the current result of the task. Will cause the caller to wait until result is available. If the task resulted in an unhandled error (derived from `StandardError`), this will be raised. If the task was stopped, this will return `nil`. + # + # Conceptually speaking, waiting on a task should return a result, and if it throws an exception, this is certainly an exceptional case that should represent a failure in your program, not an expected outcome. In other words, you should not design your programs to expect exceptions from `#wait` as a normal flow control, and prefer to catch known exceptions within the task itself and return a result that captures the intention of the failure, e.g. a `TimeoutError` might simply return `nil` or `false` to indicate that the operation did not generate a valid result (as a timeout was an expected outcome of the internal operation in this case). + public def wait: () -> ResultType + + # Stop the task and all of its children. + # + # If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later. + public def stop: (?bool later) -> untyped + + # Defer the handling of stop. During the execution of the given block, if a stop is requested, it will be deferred until the block exits. This is useful for ensuring graceful shutdown of servers and other long-running tasks. You should wrap the response handling code in a defer_stop block to ensure that the task is stopped when the response is complete but not before. + # + # You can nest calls to defer_stop, but the stop will only be deferred until the outermost block exits. + # + # If stop is invoked a second time, it will be immediately executed. + public def defer_stop: () { () -> void } -> untyped + + public def stop_deferred?: () -> bool + + # Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available. + public def self.current: () -> Task + + # Check if there is a task defined for the current fiber. + public def self.current?: () -> _Asyncable? + + public def current?: () -> bool + + private def warn: () -> untyped + + # Finish the current task, moving any children to the parent. + private def finish!: () -> untyped + + # State transition into the completed state. + private def completed!: (untyped result) -> untyped + + # State transition into the failed state. + private def failed!: (?untyped exception) -> untyped + + private def stopped!: () -> untyped + + private def stop!: () -> untyped + + private def schedule: () -> untyped + + # Raised when a child task is created within a task that has finished execution. + class FinishedError < RuntimeError + # Create a new finished error. + public def initialize: (?String message) -> void + end + end + + # Represents a flexible timeout that can be rescheduled or extended. + class Timeout + # Initialize a new timeout. + public def initialize: (untyped timers, untyped handle) -> void + + public def duration: () -> Numeric + + # Update the duration of the timeout. + # + # The duration is relative to the current time, e.g. setting the duration to 5 means the timeout will occur in 5 seconds from now. + public def duration=: (Numeric value) -> untyped + + # Adjust the timeout by the specified duration. + # + # The duration is relative to the timeout time, e.g. adjusting the timeout by 5 increases the current duration by 5 seconds. + public def adjust: (Numeric duration) -> Numeric + + public def time: () -> Numeric + + # Assign a new time to the timeout, rescheduling it if necessary. + public def time=: (Numeric value) -> Numeric + + public def now: () -> Numeric + + # Cancel the timeout, preventing it from executing. + public def cancel!: () -> untyped + + public def cancelled?: () -> bool + + private def reschedule: (untyped time) -> untyped + + # Raised when attempting to reschedule a cancelled timeout. + class CancelledError < RuntimeError + end + end + + # A synchronization primitive that allows one task to wait for another task to resolve a value. + class Variable[T] + # Create a new variable. + public def initialize: (?Condition condition) -> void + + # Resolve the value. + # + # Signals all waiting tasks. + public def resolve: (?Object value) -> untyped + + # Alias for {#resolve}. + public def value=: (untyped value) -> untyped + + # Whether the value has been resolved. + public def resolved?: () -> bool + + # Wait for the value to be resolved. + public def wait: () -> T? + + # Alias for {#wait}. + public def value: () -> untyped + end + + # A composable synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore} and/or {Barrier}. + class Waiter + # Create a waiter instance. + public def initialize: (?parent: _Asyncable?, ?finished: ::Async::Condition) -> void + + # Execute a child task and add it to the waiter. + public def async: (?parent: untyped, **untyped options) -> untyped + + # Wait for the first `count` tasks to complete. + public def first: (?Integer? count) -> (Array[::Async::Task] | Async::Task) + + # Wait for the first `count` tasks to complete. + public def wait: (?Integer? count) -> untyped + end +end + +module Kernel + # Run the given block of code in a task, asynchronously, creating a reactor if necessary. + # + # The preferred method to invoke asynchronous behavior at the top level. + # + # - When invoked within an existing reactor task, it will run the given block + # asynchronously. Will return the task once it has been scheduled. + # - When invoked at the top level, will create and run a reactor, and invoke + # the block as an asynchronous task. Will block until the reactor finishes + # running. + public def Async: [ResultType] (*untyped, **untyped) { (Task) -> ResultType } -> Task[ResultType] + + # Run the given block of code synchronously, but within a reactor if not already in one. + public def Sync: [ResultType] (**untyped) { (Task) -> ResultType } -> ResultType +end diff --git a/sig/async/asyncable.rbs b/sig/async/asyncable.rbs new file mode 100644 index 00000000..0268d98c --- /dev/null +++ b/sig/async/asyncable.rbs @@ -0,0 +1,5 @@ +module Async + interface _Asyncable + def async: [T] (*untyped, **untyped) { (Task, *untyped, **untyped) -> T } -> Task[T] + end +end diff --git a/sig/async/variable.rbs b/sig/async/variable.rbs new file mode 100644 index 00000000..2e46f572 --- /dev/null +++ b/sig/async/variable.rbs @@ -0,0 +1,6 @@ +module Async + class Variable[T] + @value: T? + @condition: Condition? + end +end