|
| 1 | +# API Reference |
| 2 | + |
| 3 | +## Channels |
| 4 | + |
| 5 | +Channels are queues that carry values and support multiple writers and readers. Channels are created with [chan](https://clojure.github.io/core.async/clojure.core.async.html#var-chan). Values in a channel are stored in a buffer. Buffers are never unbounded and there are several provided buffer types: |
| 6 | + |
| 7 | +* Unbuffered - `(chan)` - no buffer is used, and a rendezvous is required to pass a value through the channel from writer to reader |
| 8 | +* Fixed size - `(chan 10)` |
| 9 | +* Dropping - `(chan (dropping-buffer 10))` - fixed size, and when full drop newest value |
| 10 | +* Sliding - `(chan (sliding-buffer 10))` - fixed size, and when full drop oldest value |
| 11 | + |
| 12 | +Channels are first-class values that can be passed around like any other value. |
| 13 | + |
| 14 | +Channels may optionally be supplied with a [transducer](https://clojure.org/reference/transducers) and an exception handler. The transducer will be applied to values that pass through the channel. If a transducer is supplied, the channel *must* be buffered (transducers can create intermediate values that must be stored somewhere). Channel transducers must not block, whether by issuing i/o operations or by externally synchronizing, else risk impeding or deadlocking go blocks. |
| 15 | + |
| 16 | +The `ex-handler` is a function of one argument (a Throwable). If an exception occurs while applying the transducer, the `ex-handler` will be invoked, and any non-nil return value will be placed in the channel. If no `ex-handler` is supplied, exceptions will flow and be handled where they occur (note that this may in either the writer or reader thread depending on the operation and the state of the buffer). |
| 17 | + |
| 18 | +* Creating channels: [chan](https://clojure.github.io/core.async/clojure.core.async.html#var-chan) |
| 19 | +* Buffers: [buffer](https://clojure.github.io/core.async/clojure.core.async.html#var-buffer) [dropping-buffer](https://clojure.github.io/core.async/clojure.core.async.html#var-dropping-buffer) [sliding-buffer](https://clojure.github.io/core.async/clojure.core.async.html#var-sliding-buffer) [unblocking-buffer?](https://clojure.github.io/core.async/clojure.core.async.html#var-unblocking-buffer.3F) |
| 20 | + |
| 21 | +### Put and take |
| 22 | + |
| 23 | +Any value can be placed on a channel, except `nil`. The primary operations on channels are _put_ and _take_, which are provided in several variants: |
| 24 | + |
| 25 | +* Blocking: [>!!](https://clojure.github.io/core.async/clojure.core.async.html#var-.3E.21.21), [<!!](https://clojure.github.io/core.async/clojure.core.async.html#var-.3C.21.21) |
| 26 | +* Parking: [>!](https://clojure.github.io/core.async/clojure.core.async.html#var-.3E.21), [<!](https://clojure.github.io/core.async/clojure.core.async.html#var-.3C.21) |
| 27 | +* Async: [put!](https://clojure.github.io/core.async/clojure.core.async.html#var-put.21), [take!](https://clojure.github.io/core.async/clojure.core.async.html#var-take.21) |
| 28 | +* Non-blocking: [offer!](https://clojure.github.io/core.async/clojure.core.async.html#var-offer.21), [poll!](https://clojure.github.io/core.async/clojure.core.async.html#var-poll.21) |
| 29 | + |
| 30 | +NOTE: As a mnemonic, the `<` or `>` points in the direction the value travels relative to the channel arg. For example, in `(>!! chan val)` the `>` points into the channel (_put_) and `(<!! chan)` points out of the channel (_take_). |
| 31 | + |
| 32 | +The use case dictates the variant to use. Parking operations are only valid in `go` blocks (see below for more) and never valid outside the lexical scope of a `go`. Conversely, blocking operations should only be used outside `go` blocks. |
| 33 | + |
| 34 | +The async and non-blocking forms are less common but may be used in either context. Use the async variants to specify a channel and a function that is called when the take or put succeeds. The `take!` and `put!` functions also take an optional flag `on-caller?` to indicate whether the fn can be called on the current thread. The non-blocking `offer!` and `poll!` will either complete or return immediately. |
| 35 | + |
| 36 | +Channels are closed with [close!](https://clojure.github.io/core.async/clojure.core.async.html#var-close.21). When a channel is closed, no values may be added, but values already in the channel may be taken. When all values are drained from a closed channel, take operations will return `nil` (these are not valid values and serve as a marker). |
| 37 | + |
| 38 | + |
| 39 | +* Closing channels: [close!](https://clojure.github.io/core.async/clojure.core.async.html#var-close.21) |
| 40 | +* Buffers: [buffer](https://clojure.github.io/core.async/clojure.core.async.html#var-buffer) [dropping-buffer](https://clojure.github.io/core.async/clojure.core.async.html#var-dropping-buffer) [sliding-buffer](https://clojure.github.io/core.async/clojure.core.async.html#var-sliding-buffer) [unblocking-buffer?](https://clojure.github.io/core.async/clojure.core.async.html#var-unblocking-buffer.3F) |
| 41 | +* Blocking ops: [>!!](https://clojure.github.io/core.async/clojure.core.async.html#var-.3E.21.21) [<!!](https://clojure.github.io/core.async/clojure.core.async.html#var-.3C.21.21) |
| 42 | +* Parking ops: [>!](https://clojure.github.io/core.async/clojure.core.async.html#var-.3E.21) [<!](https://clojure.github.io/core.async/clojure.core.async.html#var-.3C.21) |
| 43 | +* Async ops: [put!](https://clojure.github.io/core.async/clojure.core.async.html#var-put.21) [take!](https://clojure.github.io/core.async/clojure.core.async.html#var-take.21) |
| 44 | +* Non-blocking ops: [offer!](https://clojure.github.io/core.async/clojure.core.async.html#var-offer.21) [poll!](https://clojure.github.io/core.async/clojure.core.async.html#var-poll.21) |
| 45 | + |
| 46 | +### alts |
| 47 | + |
| 48 | +`alts!` (parking) and `alts!!` (blocking) can be used to wait on a set of channel operations until one succeeds. Channel operations can be either a put (with a value) or a take. By default, if more than one operation becomes available, they are chosen in random order, but set `:priority true` to order a preference. Only one of the operations will occur. If no operation is available and a `:default val` is specified, the default value will be returned instead. |
| 49 | + |
| 50 | +Since it is common to combine an `alts` with a conditional return based on the action chosen, `alt!` (parking) and `alt!!` (blocking) combine an `alts!` select with destructuring of the channel and value and a result expression. |
| 51 | + |
| 52 | + |
| 53 | +* Blocking: [alt!!](https://clojure.github.io/core.async/clojure.core.async.html#var-alt.21.21) [alts!!](https://clojure.github.io/core.async/clojure.core.async.html#var-alts.21.21) |
| 54 | +* Parking: [alt!](https://clojure.github.io/core.async/clojure.core.async.html#var-alt.21) [alts!](https://clojure.github.io/core.async/clojure.core.async.html#var-alts.21) |
| 55 | +* Timeouts: [timeout](https://clojure.github.io/core.async/clojure.core.async.html#var-timeout) |
| 56 | + |
| 57 | +### Promise channels |
| 58 | + |
| 59 | +Promise channels are special channels that will accept only a single value. Once a value is put to a promise channel, all pending and future consumers will receive only that value. Future puts complete but drop the value. When the channel is closed, consumers will receive either the value (if a put occurred) or nil (if no put occurred) forever. |
| 60 | + |
| 61 | +Promise channels: [promise-chan](https://clojure.github.io/core.async/clojure.core.async.html#var-promise-chan) |
| 62 | + |
| 63 | +## Managing processes |
| 64 | + |
| 65 | +### go blocks and threads |
| 66 | + |
| 67 | +"Processes", in the most general sense, are represented either as go blocks or threads. Go blocks model a lightweight computation that can be "parked" (paused) without consuming a thread. Go blocks communicate externally via channels. Any core.async parking operation (`>!`, `<!`, `alt!`, `alts!`) that cannot be immediately completed will cause the block to park and it will be automatically resumed when the operation can complete (when data has arrived on a channel to allow it). |
| 68 | + |
| 69 | +Note that go blocks are multiplexed over a finite number of threads and should never be blocked, either by the use of a core.async blocking operation (like `<!!`) or by calling a blockable I/O operation like a network call. Doing so may effectively block all of the threads in the go block pool and prevent further progress. |
| 70 | + |
| 71 | +core.async provides the helper functions `thread` and `thread-call` (analogous to `future` and `future-call`) to execute a process asynchronously in a separate thread. As these threads are not limited, they are suitable for blocking operations and can communicate with other processes via channels. However, note that these threads are not special - you can create and manage your own threads in any way you like and use core.async channels from those threads to communicate. |
| 72 | + |
| 73 | + |
| 74 | +* Go blocks: [go](https://clojure.github.io/core.async/clojure.core.async.html#var-go) [go-loop](https://clojure.github.io/core.async/clojure.core.async.html#var-go-loop) |
| 75 | +* Threads: [thread](https://clojure.github.io/core.async/clojure.core.async.html#var-thread) |
| 76 | +[thread-call](https://clojure.github.io/core.async/clojure.core.async.html#var-thread-call) |
| 77 | + |
| 78 | +### Multi-threaded pipelines |
| 79 | + |
| 80 | +The `pipeline` function (and variants) are designed for modeling your work as a pipeline of multi-threaded processing stages. The stages are connected by channels and each stage has N threads performing transducer xf as values flow from the from channel to the to channel. The variants are: |
| 81 | + |
| 82 | +* `pipeline` - the work performed in the xf must not block (designed for computational parallelism). The transducer will be applied independently to each value, in parallel, so stateful transducer functions will likely not be useful. |
| 83 | +* `pipeline-blocking` - the work performed in the xf may block, for example on network operations. |
| 84 | +* `pipeline-async` - this variant triggers asynchronous work in another system or thread and expects another thread to place the results on a return channel. |
| 85 | + |
| 86 | + |
| 87 | +* Pipeline ops: [pipeline](https://clojure.github.io/core.async/clojure.core.async.html#var-pipeline) [pipeline-blocking](https://clojure.github.io/core.async/clojure.core.async.html#var-pipeline-blocking) [pipeline-async](https://clojure.github.io/core.async/clojure.core.async.html#var-pipeline-async) |
| 88 | + |
| 89 | +## Working with channels |
| 90 | + |
| 91 | +### Operations on channels |
| 92 | + |
| 93 | + |
| 94 | +* Collections: [into](https://clojure.github.io/core.async/clojure.core.async.html#var-into) [onto-chan!](https://clojure.github.io/core.async/clojure.core.async.html#var-onto-chan.21) [onto-chan!!](https://clojure.github.io/core.async/clojure.core.async.html#var-onto-chan.21.21) [to-chan](https://clojure.github.io/core.async/clojure.core.async.html#var-to-chan) |
| 95 | +* Functions: [map](https://clojure.github.io/core.async/clojure.core.async.html#var-map) [take](https://clojure.github.io/core.async/clojure.core.async.html#var-take) |
| 96 | +* Reducing: [reduce](https://clojure.github.io/core.async/clojure.core.async.html#var-reduce) [transduce](https://clojure.github.io/core.async/clojure.core.async.html#var-transduce) |
| 97 | + |
| 98 | +### Channel connectors |
| 99 | + |
| 100 | +* Connecting channels: [pipe](https://clojure.github.io/core.async/clojure.core.async.html#var-pipe) |
| 101 | +* Merging channels: [merge](https://clojure.github.io/core.async/clojure.core.async.html#var-merge) |
| 102 | +* Splitting channels: [split](https://clojure.github.io/core.async/clojure.core.async.html#var-split) |
| 103 | + |
| 104 | +### Mults |
| 105 | + |
| 106 | +* Mults: [mult](https://clojure.github.io/core.async/clojure.core.async.html#var-mult) [tap](https://clojure.github.io/core.async/clojure.core.async.html#var-tap) [untap](https://clojure.github.io/core.async/clojure.core.async.html#var-untap) [untap-all](https://clojure.github.io/core.async/clojure.core.async.html#var-untap-all) |
| 107 | + |
| 108 | +### Pub/sub |
| 109 | + |
| 110 | +* Pub/sub: [pub](https://clojure.github.io/core.async/clojure.core.async.html#var-pub) [sub](https://clojure.github.io/core.async/clojure.core.async.html#var-sub) [unsub](https://clojure.github.io/core.async/clojure.core.async.html#var-unsub) [unsub-all](https://clojure.github.io/core.async/clojure.core.async.html#var-unsub-all) |
| 111 | + |
| 112 | +### Mixes |
| 113 | + |
| 114 | +* Mixes: [mix](https://clojure.github.io/core.async/clojure.core.async.html#var-mix) [admix](https://clojure.github.io/core.async/clojure.core.async.html#var-admix) [toggle](https://clojure.github.io/core.async/clojure.core.async.html#var-toggle) [unmix](https://clojure.github.io/core.async/clojure.core.async.html#var-unmix) [unmix-all](https://clojure.github.io/core.async/clojure.core.async.html#var-unmix-all) [solo-mode](https://clojure.github.io/core.async/clojure.core.async.html#var-solo-mode) |
| 115 | + |
| 116 | +## Configuration |
| 117 | + |
| 118 | +### `go` checking |
| 119 | + |
| 120 | +Because the core.async go block thread pool is fixed size, blocking IO operations should never be done in go blocks. If all go threads are blocked on blocking operations, you may experience either deadlock or lack of progress. |
| 121 | + |
| 122 | +One common issue is the use of core.async blocking operations inside go blocks. core.async includes a debugging facility to detect this situation (other kinds of blocking operation cannot be detected so this covers only part of the problem). To enable go checking, set the Java system property `clojure.core.async.go-checking=true`. This property is read once, at namespace load time, and should be used in development or testing, not in production. |
| 123 | + |
| 124 | +When go checking is active, invalid blocking calls in a go block will throw in go block threads. By default, these will likely throw to the go block thread's uncaught exception handler and be printed, but you can use `Thread/setDefaultUncaughtExceptionHandler` to change the default behavior (or depending on your system, you may have one already that routes to logging). |
| 125 | + |
| 126 | +## More information |
| 127 | + |
| 128 | +See the following for more information: |
| 129 | + |
| 130 | +* [Rationale](https://clojure.github.io/core.async/rationale.html) |
| 131 | +* [API](https://clojure.github.io/core.async) |
| 132 | +* [Source](https://github.com/clojure/core.async) |
0 commit comments