Skip to content

Latest commit

 

History

History
188 lines (133 loc) · 4.91 KB

dataflow.org

File metadata and controls

188 lines (133 loc) · 4.91 KB

Dataflow

This namespace is designed to compose simple core.async processes. It’s probably suitable for micro-services.

The configuration and data model are inspired by Onyx.

Motivation

There is usually little need to go through the logistics and ceremony, when using core.async, to manage the channels, puts and takes, and in most instances, lifecycle, as it naturally emerges from the topology.

Processes’ topologies are usually an emergent phenomenon and not explicitly stated. There is a mix between topology, business logic, and low level async APIs.

The idea is to separate the topology of the process from logic as much as possible by providing a data language to describe the data flow, and functions and other vars are to be resolved when “compiling” the model.

Data Model

  • Edges: core.async channels
  • Vertices: processing units, connected by channels. Can be pipes, drivers, sinks.

    The graph is describe in terms of two collections:

    • Edges: data describing only the channels, including buffer types, buffer functions, size and transducers.
    • Nodes: data describing a pipeline between two channels, mult, producer or consumer.

Buffers

{::buffer/type ::buffer/blocking
 ::buffer/size 8}

{::buffer/type ::buffer/sliding
 ::buffer/size 8}

{::buffer/type ::buffer/dropping
 ::buffer/size 8}

Channels

{::chan/name :a
 ::chan/type ::chan/simple}

{::chan/name :b
 ::chan/type ::chan/sized
 ::chan/size 8}

{::chan/name :c
 ::chan/type ::chan/buffered
 ::chan/buffer {::buffer/type ::buffer/blocking
                ::buffer/size 8}}

Extension

Buffers

(defmethod buffer/-type ::your-new-type [_] ::spec-for-your-type)

(defmethod buffer/-compile ::your-new-type
  [{:keys [:buffer/arg1 :buffer/arg2]}]
  (your-buffer-fn arg1 arg2))

;;; Example from buffer namespace

(defmethod -compile ::dropping [{:keys [::size]}] (a/dropping-buffer size))

Channels

(defmethod chan/-type ::your-new-type [_] ::spec-for-your-type)

(defmethod chan/-compile ::your-new-type
  [{:keys [:chan/arg1 :chan/arg2]}]
  (your-chan-fn arg1 arg2))

;; Example from channel namespace

(defmethod -compile ::buffered [{:keys [::buffer]}] (a/chan (buffer/-compile buffer)))

Worker nodes

Worker nodes compilers also take an environment argument which contains the channels

(defmethod node/-type ::your-new-type [_] ::spec-for-your-type)

(defmethod node/-compile ::your-new-type
  [{:keys [:node/arg1 :node/arg2]} env]
  (your-node-fn arg1 arg2))

;; Example from node namespace

(defmethod -compile ::pipeline-blocking
  [{{to ::to from ::from size ::size xf ::xf} ::pipeline} env]
  (a/pipeline-blocking size (env to) xf (env from)))

Usage

Require dataflow namespaces

(require '[more.async.dataflow.node :as node]
         '[more.async.dataflow.channel :as chan]
         '[more.async.dataflow.buffer :as buffer]
         '[more.async.dataflow :as flow])

Define a model

  • Define model with channels and nodes (can be verified using spec).
  • Define the required vars.
  • Validate the model using the ::flow/model spec.
  • Try compiling the model using compile-model.

Example

(def model
  {::channels
   [{::chan/name :in
     ::chan/type ::chan/sized
     ::chan/size 1}
    {::chan/name :out
     ::chan/type ::chan/sized
     ::chan/size 1}]
   ::nodes
   [

    {::node/name :producer
     ::node/type ::node/produce
     ::node/produce
     {::node/to :in
      ::node/async true
      ::node/fn (let [a (atom 0)]
                  (fn drive []
                    (Thread/sleep 1000)
                    (swap! a inc)))}}

    {::node/name :pipeline
     ::node/type ::node/pipeline-blocking
     ::node/pipeline
     {::node/from :in
      ::node/to :out
      ::node/size 4
      ::node/xf (map (fn [x] (println x) (Thread/sleep 2500) x))}}

    {::node/name :consumer
     ::node/type ::node/consume
     ::node/consume
     {::node/from :out
      ::node/fn (fn [x] (println :OUT x))
      ::node/async? true}}]})

(s/valid? ::flow/channels (::channels model))

(s/valid? ::flow/nodes (::nodes model))

(s/valid? ::flow/model model)

(s/valid? ::flow/connected model)

(def system (compile-model model))

(a/close! (:in (::channels system)))

Status

Experimental. Looking for user reports.

Roadmap

  • [ ] Tests
  • [ ] Analyze the topology to find any dangling channels or disconnected pipes before instancing the pipes.
  • [ ] Implement select based on alt! and/or alts!.
  • [ ] Find an idiomatic way to connect a web handler as driver.
  • [ ] Refine specs, currently have no way to differentiate transducers from regular functions.