Skip to content

Commit

Permalink
update state machine definitions, type hints for new aws sdk builder …
Browse files Browse the repository at this point in the history
…class, simplify reloaded, update readme, release 0.5.8
  • Loading branch information
spieden committed Apr 2, 2018
1 parent bb970f1 commit 5e0d37b
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 170 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ At the REPL:
```clojure
(require '[stepwise.core :as stepwise])

(stepwise/create-state-machine :adder
(stepwise/ensure-state-machine :adder
{:start-at :add
:states {:add {:type :task
:resource :activity/add
Expand All @@ -57,7 +57,7 @@ At the REPL:

## Development Workflow

Stepwise enables a rapidly cycling development workflow for Step Functions. State machine registrations are immutable, which is a virtue but does mean you need to create a new one for each minor change during development. Also, activity task polls are long and cannot be interrupted, demanding registration of new activities (or a JVM restart) to prevent stealing by stale bytecode. Stepwise provides a single function, `stepwise.reloaded/run-execution`, that uses fresh state machine and activity task registrations to run a state machine execution wherein code changes are immediately reflected.
Stepwise enables a rapidly cycling development workflow for Step Functions. State machine definition updates are eventually consistent, so new state machines need to be created on each cycle during development. Also, activity task polls are long and cannot be interrupted, demanding registration of new activities (or a JVM restart) to prevent stealing by stale bytecode. Stepwise provides a single function, `stepwise.reloaded/run-execution`, that uses fresh state machine and activity task registrations to run a state machine execution wherein code changes are immediately reflected.

Example:

Expand All @@ -71,16 +71,16 @@ Example:
{:x 41 :y 1})
=>
{:output 42,
:state-machine-arn "arn:aws:states:us-west-2:123456789012:stateMachine:adder_SNAPSHOT002",
:start-date #inst"2017-06-27T19:32:23.451-00:00",
:stop-date #inst"2017-06-27T19:32:23.594-00:00",
:state-machine-arn "arn:aws:states:us-west-2:256212633204:stateMachine:adder-1522697821734",
:start-date #inst"2018-04-02T19:37:02.061-00:00",
:stop-date #inst"2018-04-02T19:37:02.183-00:00",
:input {:x 41,
:y 1,
:state-machine-name "adder_SNAPSHOT002",
:execution-name "0ff41703-dd9f-4f88-932c-c30f3c5e707b"},
:arn "arn:aws:states:us-west-2:123456789012:execution:adder_SNAPSHOT002:0ff41703-dd9f-4f88-932c-c30f3c5e707b",
:state-machine-name "adder-1522697821734",
:execution-name "93f1d268-b2ff-4261-bf53-8ff92d7bc2c2"},
:arn "arn:aws:states:us-west-2:256212633204:execution:adder-1522697821734:93f1d268-b2ff-4261-bf53-8ff92d7bc2c2",
:status "SUCCEEDED",
:name "0ff41703-dd9f-4f88-932c-c30f3c5e707b"}
:name "93f1d268-b2ff-4261-bf53-8ff92d7bc2c2"}
```

Naturally your state machine and handlers will not be defined inline like this, so pair this call with something like [tools.namespace](https://github.com/clojure/tools.namespace) or [Cursive](https://cursive-ide.com/)'s native code reloading to rapidly try out changes to your namespaces.
Expand Down Expand Up @@ -164,7 +164,7 @@ You can of course use keywords for your custom error names, including namespaced

### Activity Task Resources

Activity task resources can be specified as keywords and `stepwise.core/create-state-machine` will register appropriately named activities for you and substitute in their ARNs. For example:
Activity task resources can be specified as keywords and `stepwise.core/ensure-state-machine` will register appropriately named activities for you and substitute in their ARNs. For example:

```
JSON
Expand Down
5 changes: 3 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
(defproject uwcpdx/stepwise "0.5.8-SNAPSHOT"
(defproject uwcpdx/stepwise "0.5.8"
:profiles {:dev {:dependencies [[org.clojure/tools.namespace "0.2.11"]
[org.clojure/clojure "1.9.0"]
[org.clojure/spec.alpha "0.1.143"]
[pjstadig/humane-test-output "0.8.1"]
[org.slf4j/slf4j-simple "1.7.25"]
[com.gfredericks/test.chuck "0.2.7"]
[org.clojure/clojure "1.9.0"]
; TODO pending jdk 9 compat https://github.com/pallet/alembic/pull/16
#_[alembic "0.3.2"]
[org.clojure/test.check "0.9.0"]]
Expand All @@ -15,7 +16,7 @@
:username :env/CLOJARS_USERNAME
:password :env/CLOJARS_PASSWORD
:sign-releases false}}
:dependencies [[uwcpdx/bean-dip "0.7.4" :exclusions [joda-time]]
:dependencies [[uwcpdx/bean-dip "0.7.5"]
[org.clojure/tools.logging "0.4.0"]
[org.clojure/data.json "0.2.6"]
[org.clojure/core.async "0.4.474"]
Expand Down
15 changes: 8 additions & 7 deletions src/stepwise/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@
([arn definition role-arn]
(update-state-machine (get-default-client) arn definition role-arn))
([^AWSStepFunctionsClient client arn definition role-arn]
(->> {:arn arn
:definition (.toPrettyJson ^StateMachine (mdl/map->StateMachine definition))
:role-arn role-arn}
mdl/map->UpdateStateMachineRequest
(.updateStateMachine client)
mdl/UpdateStateMachineResult->map
::mdl/update-date)))
(do (->> #::mdl {:arn arn
:definition-json (.toPrettyJson ^StateMachine
(mdl/map->StateMachine definition))
:role-arn role-arn}
mdl/map->UpdateStateMachineRequest
(.updateStateMachine client)
mdl/UpdateStateMachineResult->map)
arn)))

(defn delete-activity
([arn] (delete-activity (get-default-client) arn))
Expand Down
19 changes: 12 additions & 7 deletions src/stepwise/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,24 @@
[clojure.set :as sets]
[clojure.core.async :as async]
[clojure.tools.logging :as log]
[clojure.walk :as walk]))
[clojure.walk :as walk])
(:import (com.amazonaws.services.stepfunctions.model StateMachineAlreadyExistsException)))

(defn denamespace-keys [response]
(walk/prewalk (sgr/renamespace-keys (constantly true) nil)
response))

(defn create-state-machine [name definition]
(defn ensure-state-machine [name definition]
(let [definition (sgr/desugar definition)
activity-name->arn (activities/ensure-all (activities/get-names definition))
definition (activities/resolve-kw-resources activity-name->arn definition)]
(client/create-state-machine (arns/make-name name)
definition
(iam/ensure-execution-role))))
definition (activities/resolve-kw-resources activity-name->arn definition)
name-ser (arns/make-name name)
arn (arns/get-state-machine-arn name-ser)
execution-role (iam/ensure-execution-role)]
(try
(client/create-state-machine name-ser definition execution-role)
(catch StateMachineAlreadyExistsException _
(client/update-state-machine arn definition execution-role)))))

(defn describe-execution [arn]
(-> (client/describe-execution arn)
Expand Down Expand Up @@ -73,7 +78,7 @@
(map (fn [activity-name]
[activity-name (arns/get-activity-arn activity-name)]))
(keys task-handlers))
; was tripping call throttles
; TODO was tripping call throttles
#_(activities/ensure-all (keys task-handlers))]
(workers/boot (-> task-handlers
(sets/rename-keys activity->arn)
Expand Down
70 changes: 16 additions & 54 deletions src/stepwise/model.clj
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,7 @@
(defmethod bd/builder-override [Catcher ::error-equals] [_ ^Catcher$Builder builder error-equals]
(.errorEquals builder (into-array String (map name error-equals))))

(defmethod bd/builder-override [Catcher ::result-path] [_ ^Catcher$Builder builder ^String input-path]
(.resultPath builder input-path))

(def-builder-translation Catcher #{::error-equals ::result-path ::transition})
(def-builder-translation Catcher #{::error-equals [::result-path String] ::transition})

(defmethod bd/->map-val ::error-equals [_ error-equals]
(into #{} error-equals))
Expand Down Expand Up @@ -306,8 +303,8 @@
(.outputPath builder output-path))

(def-builder-translation ChoiceState
#{::choices ::comment ::default-state-name ::input-path ::output-path
::terminal-state?}
#{::choices ::comment ::default-state-name [::input-path String]
[::output-path String] ::terminal-state?}
::terminal-state?)

(def-builder-translation FailState #{::cause ::comment ::error})
Expand All @@ -316,27 +313,12 @@
(defmethod bd/builder-override [PassState ::transition] [_ ^PassState$Builder builder transition]
(.transition builder (map->Transition transition)))

(defmethod bd/builder-override [PassState ::input-path] [_ ^PassState$Builder builder ^String input-path]
(.inputPath builder input-path))

(defmethod bd/builder-override [PassState ::result-path] [_ ^PassState$Builder builder ^String result-path]
(.resultPath builder result-path))

(defmethod bd/builder-override [PassState ::output-path] [_ ^PassState$Builder builder ^String output-path]
(.outputPath builder output-path))

(def-builder-translation PassState
#{::comment ::input-path ::output-path [::result String] ::result-path
::transition})

(defmethod bd/builder-override [SucceedState ::input-path] [_ ^SucceedState$Builder builder ^String input-path]
(.inputPath builder input-path))

(defmethod bd/builder-override [SucceedState ::output-path] [_ ^SucceedState$Builder builder ^String output-path]
(.outputPath builder output-path))
#{::comment [::input-path String] [::output-path String] [::result String]
[::result-path String] ::transition})

(def-builder-translation SucceedState
#{::comment ::input-path ::output-path ::terminal-state?}
#{::comment [::input-path String] [::output-path String] ::terminal-state?}
::terminal-state?)

(defmethod bd/builder-override [TaskState ::transition] [_ ^TaskState$Builder builder transition]
Expand All @@ -356,30 +338,17 @@
(defmethod bd/->bean-val ::heartbeat-seconds [_ heartbeat-seconds]
(int heartbeat-seconds))

(defmethod bd/builder-override [TaskState ::output-path] [_ ^TaskState$Builder builder ^String output-path]
(.outputPath builder output-path))

(defmethod bd/builder-override [TaskState ::result-path] [_ ^TaskState$Builder builder ^String result-path]
(.resultPath builder result-path))

(defmethod bd/builder-override [TaskState ::input-path] [_ ^TaskState$Builder builder ^String input-path]
(.inputPath builder input-path))

(def-builder-translation TaskState
#{::catchers ::comment ::heartbeat-seconds ::input-path ::output-path
::resource ::result-path ::retriers ::timeout-seconds ::transition})
#{::catchers ::comment ::heartbeat-seconds [::input-path String]
[::output-path String] [::result-path String] ::resource ::retriers
::timeout-seconds ::transition})

(defmethod bd/builder-override [WaitState ::transition] [_ ^WaitState$Builder builder transition]
(.transition builder (map->Transition transition)))

(defmethod bd/builder-override [WaitState ::input-path] [_ ^WaitState$Builder builder ^String input-path]
(.inputPath builder input-path))

(defmethod bd/builder-override [WaitState ::output-path] [_ ^WaitState$Builder builder ^String output-path]
(.outputPath builder output-path))

(def-builder-translation WaitState
#{::comment ::input-path ::output-path ::transition ::wait-for})
#{::comment [::input-path String] [::output-path String] ::transition
::wait-for})

(declare state-kw->map->Bean)

Expand Down Expand Up @@ -410,18 +379,9 @@
(defmethod bd/builder-override [ParallelState ::transition] [_ ^ParallelState$Builder builder transition]
(.transition builder (map->Transition transition)))

(defmethod bd/builder-override [ParallelState ::output-path] [_ ^ParallelState$Builder builder ^String output-path]
(.outputPath builder output-path))

(defmethod bd/builder-override [ParallelState ::input-path] [_ ^ParallelState$Builder builder ^String input-path]
(.inputPath builder input-path))

(defmethod bd/builder-override [ParallelState ::result-path] [_ ^ParallelState$Builder builder ^String result-path]
(.resultPath builder result-path))

(def-builder-translation ParallelState
#{::branches ::catchers ::comment ::input-path ::output-path ::result-path
::retriers ::transition})
#{::branches ::catchers ::comment [::input-path String]
[::output-path String] [::result-path String] ::retriers ::transition})

(def state-kw->map->Bean
{::choice map->ChoiceState$Builder
Expand Down Expand Up @@ -460,7 +420,9 @@
(bd/def-translation CreateActivityResult #{[:activity-arn ::arn] ::creation-date})

(bd/def-translation CreateStateMachineRequest #{::name [::definition StateMachine] ::role-arn})
(bd/def-translation UpdateStateMachineRequest #{[:state-machine-arn ::arn] :definition ::role-arn})
(bd/def-translation UpdateStateMachineRequest #{[:state-machine-arn ::arn]
[:definition ::definition-json]
::role-arn})

(defmethod bd/->bean-val ::definition [_ definition]
(map->StateMachine definition))
Expand Down
123 changes: 34 additions & 89 deletions src/stepwise/reloaded.clj
Original file line number Diff line number Diff line change
@@ -1,97 +1,42 @@
(ns stepwise.reloaded
(:require [stepwise.client :as client]
[stepwise.arns :as arns]
[stepwise.model :as mdl]
[stepwise.core :as core]
[clojure.string :as strs]
[stepwise.activities :as activities]
[clojure.set :as sets]
[stepwise.sugar :as sgr]
[stepwise.iam :as iam]
[stepwise.workers :as workers])
(:import (com.amazonaws.services.stepfunctions.model StateMachineDeletingException
StateMachineDoesNotExistException
ExecutionDoesNotExistException)
(java.util UUID)))

(def max-cycles-per-minute 120)
(def version-delimiter "_SNAPSHOT")
(def version-delimiter-re (re-pattern version-delimiter))

(defn get-next-version [current-arns]
(let [arn (-> current-arns sort reverse first)
last-ver (some-> arn
(strs/split version-delimiter-re)
second
(Integer/parseInt))]
(if last-ver
(if (>= last-ver max-cycles-per-minute)
0
(+ last-ver 1))
0)))

(def version-format
(str "%0" (count (str max-cycles-per-minute)) "d"))

(defn deversion-name [nm]
(when (re-find version-delimiter-re nm)
(first (strs/split (name nm)
version-delimiter-re))))

(defn version-name [nm version]
(keyword (namespace nm)
(str (name nm)
version-delimiter
(format version-format version))))

(defn purge-machines [arns]
(doseq [arn arns]
(doseq [execution (try (::mdl/executions (client/list-executions arn))
(catch StateMachineDoesNotExistException _))]
(try (client/stop-execution (::mdl/arn execution))
(catch ExecutionDoesNotExistException _)))
(try (client/delete-state-machine arn)
(catch StateMachineDeletingException _))))

(defn purge-activities [arns]
(doseq [arn arns]
(client/delete-activity arn)))

(defn get-family-arns [machine-name]
(into #{}
(comp (filter #(= (deversion-name (::mdl/name %))
(arns/make-name machine-name)))
(map ::mdl/arn))
(::mdl/state-machines (client/list-state-machines))))
[stepwise.arns :as arns])
(:import (java.util UUID)))

(defn machine-name->snapshot [machine-name]
(str (when-let [mns (namespace machine-name)]
(str mns "-"))
(name machine-name)
"-"
(str (System/currentTimeMillis))))

(defn activity-name->snapshot [snapshot-name activity-names]
(into {}
(map (fn [activity-name]
[activity-name (keyword (str snapshot-name
(when-let [ans (namespace activity-name)]
(str "-" ans)))
(name activity-name))]))
activity-names))

(defn run-execution [machine-name definition task-handlers input]
(let [machine-arns (get-family-arns machine-name)
version (get-next-version machine-arns)
machine-name (version-name machine-name version)
activity-names (activities/get-names definition)
member-activity? (into #{} (map arns/make-name) activity-names)
; TODO paginate
activity-arns (into #{}
(comp (filter #(member-activity? (deversion-name (::mdl/name %))))
(map ::mdl/arn))
(::mdl/activities (client/list-activities)))]

(purge-machines machine-arns)
(purge-activities activity-arns)

(let [activity->snapshot (into {}
(map #(vector % (version-name % version)))
activity-names)
_ (core/create-state-machine machine-name
(activities/resolve-kw-resources activity->snapshot
definition))
workers (core/start-workers (sets/rename-keys task-handlers
activity->snapshot))
result (core/run-execution machine-name
{:input input
:execution-name (str (UUID/randomUUID))})]
(core/kill-workers workers)
result)))

; TODO clean-up function to purge all snapshot machines/activities for a machine
(let [snapshot-name (machine-name->snapshot machine-name)
activity-names (activities/get-names definition)
activity->snapshot (activity-name->snapshot snapshot-name activity-names)
definition (activities/resolve-kw-resources activity->snapshot
definition)
snapshot-arn (core/ensure-state-machine snapshot-name definition)
workers (core/start-workers (sets/rename-keys task-handlers
activity->snapshot))
result (core/run-execution snapshot-name
{:input input
:execution-name (str (UUID/randomUUID))})]
(core/kill-workers workers)
(doseq [[_ snapshot-name] activity->snapshot]
(client/delete-activity (arns/get-activity-arn snapshot-name)))
(client/delete-state-machine snapshot-arn)
result))

2 changes: 1 addition & 1 deletion test/stepwise/dev_repl.clj
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
machine-id (keyword "dev-repl" rand-ns)
activity-kw (keyword "dev-repl" rand-ns)
workers (core/start-workers namespace {activity-kw (fn [{:keys [a b]}] (throw (ex-info "hi" {:error :blamo})))})]
(core/create-state-machine namespace
(core/ensure-state-machine namespace
machine-id
{:start-at :foo
:states {:foo {:type :task
Expand Down

0 comments on commit 5e0d37b

Please sign in to comment.