diff --git a/aws-flow/lib/aws/decider/data_converter.rb b/aws-flow/lib/aws/decider/data_converter.rb index 72feb03..1328332 100644 --- a/aws-flow/lib/aws/decider/data_converter.rb +++ b/aws-flow/lib/aws/decider/data_converter.rb @@ -1,5 +1,5 @@ #-- -# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# Copyright (C) 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. @@ -15,14 +15,21 @@ module AWS module Flow - # Converts an object to YAML. Exceptions are handled differently because YAML doesn't propagate backtraces - # properly, and they are very handy for debugging. + # Serializes/deserializes Ruby objects using {http://yaml.org/ YAML} format. + # *This is the default data converter for the AWS Flow Framework for Ruby*. + # + # @note There is a 32K (32,768) character limit on activity/workflow input + # and output. If the amount of data that you need to pass exceeds this + # limit, use {S3DataConverter} instead. + # class YAMLDataConverter - # Serializes a ruby object into a YAML string. + # Serializes a Ruby object into a YAML string. + # + # @param object [Object] + # The object to serialize. # - # @param object - # The object that needs to be serialized into a string. + # @return the object's data in YAML format. # def dump(object) if object.is_a? Exception @@ -31,10 +38,12 @@ def dump(object) object.to_yaml end - # Deserializes a YAML string into a ruby object. + # Deserializes a YAML string into a Ruby object. # - # @param source - # The source YAML string that needs to be deserialized into a ruby object. + # @param source [String] + # The YAML string to deserialize. + # + # @return a Ruby object generated from the YAML string. # def load(source) return nil if source.nil? @@ -51,37 +60,56 @@ def load(source) end end - # S3DataConverter uses YAMLDataConverter internally to serialize and - # deserialize ruby objects. Additionally it stores objects larger than - # 32k characeters in AWS S3 and returns a serialized s3 link to be - # deserialized remotely. It caches objects locally to minimize calls to S3. + # S3DataConverter serializes/deserializes Ruby objects using + # {YAMLDataConverter}, storing the serialized data on Amazon S3. This data + # can exceed 32K (32,768) characters in size, providing a way to work past + # Amazon SWF's [input/output data limit][limits]. + # + # [limits]: http://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dg-limits.html + # + # To activate it, set the `AWS_SWF_BUCKET_NAME` environment variable to the + # name of an Amazon S3 bucket to use to store workflow/activity data. The + # bucket will be created if necessary. # - # AWS Flow Framework for Ruby doesn't delete files from S3 to prevent loss - # of data. It is recommended that users use Object Lifecycle Management in - # AWS S3 to auto delete files. + # S3DataConverter caches data on the local system. The cached version of the + # file's data is used if it is found. Otherwise, the file is downloaded from + # Amazon S3 and then deserialized to a Ruby object. + # + # `S3DataConverter` serializes Ruby objects using {YAMLDataConverter} and + # stores them in the Amazon S3 bucket specified by `AWS_SWF_BUCKET_NAME`, + # using a randomly-generated filename to identify the object's data. + # + # @note The AWS Flow Framework for Ruby doesn't delete files from S3 to + # prevent loss of data. It is recommended that you use [Object + # Lifecycle Management][olm] in Amazon S3 to automatically delete files + # after a certain period. + # + # [olm]: http://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html # - # More information about object expiration can be found at: - # http://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectExpiration.html class S3DataConverter require 'lru_redux' # S3Cache is a wrapper around the LruRedux cache. + # @api private class S3Cache attr_reader :cache MAX_SIZE = 1000 + # @api private def initialize @cache = LruRedux::ThreadSafeCache.new(MAX_SIZE) end # Cache lookup + # @api private def [](key) @cache[key] end # Cache entry + # @api private def []=(key, value) @cache[key] = value end @@ -108,6 +136,17 @@ def converter end + # Initialize a new S3DataConverter, providing it the name of an Amazon S3 + # bucket that will be used to store serialized Ruby objects. The bucket + # will be created if it doesn't already exist. + # + # @note The default data converter specified by + # {FlowConstants.default_data_converter} will be used to serialize + # and deserialize the data. Ordinarily, this is {YAMLDataConverter}. + # + # @param bucket [String] + # The Amazon S3 bucket name to use for serialized data storage. + # def initialize(bucket) @bucket = bucket @cache = S3Cache.new @@ -116,17 +155,29 @@ def initialize(bucket) @converter = FlowConstants.default_data_converter end - # Serializes a ruby object into a string. If the size of the converted - # string is greater than 32k characters, the string is uploaded to an - # AWS S3 file and a serialized hash containing the filename is returned - # instead. The filename is generated at random in the following format - - # rubyflow_data_. + # Serializes a Ruby object into a string (by default, YAML). The resulting + # data, if > 32,768 (32K) characters, is uploaded to the bucket specified + # when the `S3DataConverter` was initialized and is copied to the local + # cache. + # + # @param [Object] object + # The object to be serialized into a string. By default, the framework + # serializes the object into a YAML string using {YAMLDataConverter}. + # + # @return [String] + # The file's serialized data if the resulting data is < 32,768 (32K) + # characters. + # + # If the resulting data is > 32K, then the file is uploaded to S3 and + # a YAML string is returned that represents a hash of the following + # form: + # + # { s3_filename: } + # + # The returned *filename* is randomly-generated, and follows the form: # - # The format of the returned serialized hash is - { s3_filename: } + # rubyflow_data_ # - # @param object - # The object that needs to be serialized into a string. By default it - # serializes the object into a YAML string. # def dump(object) string = @converter.dump(object) @@ -138,15 +189,20 @@ def dump(object) ret end - # Deserializes a string into a ruby object. If the deserialized - # string is a ruby hash of the format { s3_filename: }, then - # it will first look for the file in a local cache. In case of a cache miss, - # it will try to download the file from AWS S3, deserialize the contents - # of the file and return the new object. + # Deserializes a string into a Ruby object. If the deserialized string is + # a Ruby hash of the format: *{ s3_filename: }*, then the + # local cache is searched for the file's data. If the file is not cached, + # then the file is downloaded from Amazon S3, deserialized, and its data + # is copied to the cache. + # + # @param [String] source + # The source string that needs to be deserialized into a Ruby object. + # By default it expects the source to be a YAML string that was + # serialized using {#dump}. + # + # @return [Object] A Ruby object created by deserializing the YAML source + # string. # - # @param source - # The source that needs to be deserialized into a ruby object. By - # default it expects the source to be a YAML string. # def load(source) object = @converter.load(source) ret = object @@ -156,11 +212,15 @@ def load(source) ret end - # Helper method to write a string to an s3 file. A random filename is - # generated of the format - rubyflow_data_ + # Helper method to write a string to an Amazon S3 file. + # + # @param [String] string + # The string to be uploaded to Amazon S3. The file's data is uploaded to + # the bucket specified when the `S3DataConverter` was initialized, and + # is also copied into the cache. # - # @param string - # The string to be uploaded to S3 + # @return [String] + # The randomly-generated filename of the form: *rubyflow_data_*. # # @api private def put_to_s3(string) @@ -171,9 +231,15 @@ def put_to_s3(string) return filename end - # Helper method to read an s3 file + # Helper method to read an Amazon S3 file. + # # @param s3_filename - # File name to be deleted + # The file name on Amazon S3 to be read. If the file's data exists in + # the cache, the cached version is returned. Otherwise, the file is + # retrieved from the S3 bucket that was specified when `S3DataConverter` + # was initialized, and the file's data is added to the cache. + # + # @return the data in the file. # # @api private def get_from_s3(s3_filename) @@ -184,14 +250,15 @@ def get_from_s3(s3_filename) ret = s3_object.read @cache[s3_filename] = ret rescue AWS::S3::Errors::NoSuchKey => e - raise "Could not find key #{s3_filename} in bucket #{@bucket} on S3. #{e}" + raise "Could not find key #{s3_filename} in bucket #{@bucket} on Amazon S3. #{e}" end return ret end - # Helper method to delete an s3 file + # Helper method to delete an Amazon S3 file + # # @param s3_filename - # File name to be deleted + # The file name on S3 to be deleted # # @api private def delete_from_s3(s3_filename) diff --git a/aws-flow/lib/aws/decider/starter.rb b/aws-flow/lib/aws/decider/starter.rb index d067622..e8e7c49 100644 --- a/aws-flow/lib/aws/decider/starter.rb +++ b/aws-flow/lib/aws/decider/starter.rb @@ -39,7 +39,7 @@ module Flow # # @option opts [Array] *Optional* :tag_list # - # @option opts *Optional* :data_converter + # @option opts [] *Optional* :data_converter # # Usage - # @@ -138,7 +138,7 @@ def self.start_workflow(workflow = nil, input, opts) # Starts an Activity or a Workflow Template execution using the # default workflow class FlowDefaultWorkflowRuby # - # @param [String or AWS::Flow::Templates::TemplateBase] name_or_klass + # @param [String, AWS::Flow::Templates::TemplateBase] name_or_klass # The Activity or the Workflow Template that needs to be scheduled via # the default workflow. This argument can either be a string that # represents a fully qualified activity name - . @@ -147,55 +147,57 @@ def self.start_workflow(workflow = nil, input, opts) # @param [Hash] input # Input hash for the workflow execution # - # @param [Hash] opts + # @param [Hash] options # Additional options to configure the workflow or activity execution. # - # @option opts [true, false] :get_result + # @option options [true, false] :get_result # *Optional* This boolean flag can be set to true if the result future # if required. The future can be waited on by using the # AWS::Flow::wait_for_all, AWS::Flow::wait_for_any methods or by # calling the ExternalFuture#get method. Default value is false. # - # @option opts [Hash] :exponential_retry + # @option options [Hash] :exponential_retry # A hash of {AWS::Flow::ExponentialRetryOptions}. Default value is - # { maximum_attempts: 3 } # - # @option opts [String] *Optional* :domain + # @option options [String] *Optional* :domain # Default value is FlowDefault # - # @option opts [Integer] *Optional* :execution_start_to_close_timeout + # @option options [Integer] *Optional* :execution_start_to_close_timeout # Default value is 3600 seconds (1 hour) # - # @option opts [String] *Optional* :workflow_id + # @option options [String] *Optional* :workflow_id # - # @option opts [Integer] *Optional* :task_priority + # @option options [Integer] *Optional* :task_priority # Default value is 0 # - # @option opts [String] *Optional* :tag_list + # @option options [String] *Optional* :tag_list # By default, the name of the activity task gets added to the workflow's # tag_list # - # @option opts *Optional* :data_converter - # Default value is {AWS::Flow::YAMLDataConverter}. To use the - # {AWS::Flow::S3DataConverter}, set the AWS_SWF_BUCKET_NAME environment - # variable name with a valid AWS S3 bucket name. + # @option options [YAMLDataConverter, S3DataConverter] *Optional* :data_converter + # The default value is {AWS::Flow::YAMLDataConverter}. # - # @option opts *Optional* A hash of {AWS::Flow::ActivityOptions} + # To use {AWS::Flow::S3DataConverter}, set the environment variable + # `AWS_SWF_BUCKET_NAME` with a valid Amazon S3 bucket name. # - # Usage - + # @option options [Hash] *Optional* A hash of {AWS::Flow::ActivityOptions} + # + # Usage: + # + # AWS::Flow::start(".", , + # ) + # + # Examples: # - # AWS::Flow::start(".", , - # ) + # * Start an activity execution: # - # Example - + # AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" }) # - # 1) Start an activity execution - - # AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" }) + # * Start an activity execution with overridden options: # - # 2) Start an activity execution with overriden options - - # AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" }, { - # exponential_retry: { maximum_attempts: 10 } } - # ) + # AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" }, + # { exponential_retry: { maximum_attempts: 10 } } ) # def self.start(name_or_klass, input, options = {}) AWS::Flow::Templates::Starter.start(name_or_klass, input, options) diff --git a/aws-flow/lib/aws/decider/state_machines.rb b/aws-flow/lib/aws/decider/state_machines.rb index ac002f0..95e6927 100644 --- a/aws-flow/lib/aws/decider/state_machines.rb +++ b/aws-flow/lib/aws/decider/state_machines.rb @@ -81,22 +81,27 @@ def consume(symbol) class CompleteWorkflowStateMachine extend DecisionStateMachineDFA attr_reader :id + + # @api private def consume(symbol) return @decision = nil if symbol == :handle_initiation_failed_event return if symbol == :handle_decision_task_started_event raise "UnsupportedOperation" end + + # @api private + def done? + ! @decision.nil? + end + # Creates a new `CompleteWorkflowStateMachine`. # # @param id # The decider ID. # - # @param attributes + # @param decision # - - def done? - ! @decision.nil? - end + # @api private def initialize(id, decision) @id = id @decision = decision @@ -104,6 +109,7 @@ def initialize(id, decision) end init(:created) + # @api private def get_decision return @decision end @@ -115,12 +121,13 @@ class DecisionStateMachineBase extend DecisionStateMachineDFA attr_reader :id + # @api private def initialize(id) @id = id @current_state = :created end - + # @api private def handle_started_event(event) @state_history << :handle_started_event end @@ -146,6 +153,7 @@ def handle_started_event(event) ] self_transitions(:handle_decision_task_started_event) + # @api private def done? @current_state == :completed || @current_state == :completed_after_cancellation_decision_sent end @@ -162,6 +170,7 @@ class ActivityDecisionStateMachine < DecisionStateMachineBase # # @param attributes # + # @api private def initialize(decision_id, attributes) @attributes = attributes super(decision_id) @@ -171,6 +180,8 @@ def initialize(decision_id, attributes) [:cancelled_after_initiated, :handle_decision_task_started_event, :cancellation_decision_sent], [:cancellation_decision_sent, :handle_cancellation_failure_event, :initiated] ] + + # @api private def get_decision case @current_state when :created @@ -180,6 +191,7 @@ def get_decision end end + # @api private def create_schedule_activity_task_decision options = @attributes[:options] attribute_type = :schedule_activity_task_decision_attributes @@ -200,6 +212,7 @@ def create_schedule_activity_task_decision result end + # @api private def create_request_cancel_activity_task_decision { :decision_type => "RequestCancelActivityTask", :request_cancel_activity_task_decision_attributes => {:activity_id => @attributes[:decision_id]} } @@ -215,6 +228,7 @@ def initialize(decision_id, attributes) super(decision_id) end + # @api private def create_start_timer_decision { :decision_type => "StartTimer", @@ -227,6 +241,7 @@ def create_start_timer_decision } end + # @api private def create_cancel_timer_decision { :decision_type => "CancelTimer", @@ -236,6 +251,7 @@ def create_cancel_timer_decision } end + # @api private def get_decision case @current_state when :created @@ -245,6 +261,7 @@ def get_decision end end + # @api private def done? @current_state == :completed || @cancelled end @@ -264,6 +281,7 @@ def initialize(decision_id, attributes) super(decision_id) end + # @api private def get_decision case @current_state when :created @@ -271,6 +289,7 @@ def get_decision end end + # @api private def create_signal_external_workflow_execution_decison extra_options = {} [:input, :control, :run_id].each do |type| @@ -308,11 +327,14 @@ def create_signal_external_workflow_execution_decison # @api private class ChildWorkflowDecisionStateMachine < DecisionStateMachineBase attr_accessor :run_id, :attributes + + # @api private def initialize(decision_id, attributes) @attributes = attributes super(decision_id) end + # @api private def create_start_child_workflow_execution_decision options = @attributes[:options] workflow_name = options.workflow_name || options.prefix_name @@ -341,6 +363,7 @@ def create_start_child_workflow_execution_decision result end + # @api private def create_request_cancel_external_workflow_execution_decision result = { :decision_type => "RequestCancelExternalWorkflowExecution", @@ -351,6 +374,7 @@ def create_request_cancel_external_workflow_execution_decision } end + # @api private def get_decision case @current_state when :created diff --git a/aws-flow/lib/aws/flow/future.rb b/aws-flow/lib/aws/flow/future.rb index b037c5e..bcb2f16 100644 --- a/aws-flow/lib/aws/flow/future.rb +++ b/aws-flow/lib/aws/flow/future.rb @@ -141,19 +141,24 @@ def broadcast # Represents the result of an asynchronous computation. Methods are # provided to: # - # * retrieve the result of the computation, once it is complete ({ExternalFuture#get}). - # * check if the computation is complete ({ExternalFuture#set?}) - # * execute a block when computation is complete ({ExternalFuture#on_set}) + # * retrieve the result of the computation, once it is complete + # ({AWS::Flow::Core::ExternalFuture#get}). + # + # * check if the computation is complete + # ({AWS::Flow::Core::ExternalFuture#set?}) + # + # * execute a block when computation is complete + # ({AWS::Flow::Core::ExternalFuture#on_set}) # # The result of a Future can only be retrieved when the computation has - # completed. {ExternalFuture#get} blocks execution, if necessary, until the - # ExternalFuture is ready. + # completed. {AWS::Flow::Core::ExternalFuture#get} blocks execution, if + # necessary, until the `ExternalFuture` is ready. # - # Unlike {Future}, {ExternalFuture#get} doesn't block Fibers. Instead it - # blocks the current thread by waiting on a ruby {ConditionVariable}. The - # condition variable is signalled when the future is set, which allows the - # thread to continue execution when the result is ready. This lets us use - # the future outside of an {AsyncScope} + # Unlike {Future}, {AWS::Flow::Core::ExternalFuture#get} doesn't block + # Fibers. Instead it blocks the current thread by waiting on a Ruby + # `ConditionVariable`. The condition variable is signalled when the future + # is set, which allows the thread to continue execution when the result is + # ready. This lets us use the future outside of an {AsyncScope} # class ExternalFuture < Future @@ -189,10 +194,10 @@ def method_missing(method, *args, &block) end - # Wrapper around a ruby {Mutex} and {ConditionVariable} to avoid + # Wrapper around a Ruby `Mutex` and `ConditionVariable` to avoid # writing the synchronization lines repeatedly. # {ExternalConditionVariable#wait} will block the thread until - # {ConditionVariable} @cond is signalled + # `ConditionVariable` @cond is signalled # class ExternalConditionVariable @@ -208,7 +213,7 @@ def wait(timeout=nil) @mutex.synchronize { @cond.wait(@mutex, timeout) } end - # Pass all messages to the encapsulated {ConditionVariable} + # Pass all messages to the encapsulated `ConditionVariable` def method_missing(method, *args) @cond.send(method, *args) end diff --git a/aws-flow/lib/aws/templates/activity.rb b/aws-flow/lib/aws/templates/activity.rb index 4896bde..7bf1b97 100644 --- a/aws-flow/lib/aws/templates/activity.rb +++ b/aws-flow/lib/aws/templates/activity.rb @@ -51,15 +51,15 @@ def run(input, context) end # Initializes an activity template - # @param {String} name - # @param {Hash} options + # @param [String] name + # @param [Hash] opts def activity(name, opts = {}) AWS::Flow::Templates.send(:activity, name, opts) end # Initializes an activity template - # @param {String} name - # @param {Hash} options + # @param [String] name + # @param [Hash] opts def self.activity(name, opts = {}) ActivityTemplate.new(name, opts) end @@ -89,17 +89,17 @@ def run(input, context) end # Initializes a result activity template - # @param {String} key + # @param [String] key # A unique key that identifies the result of an activity execution - # @param {Hash} options + # @param [Hash] opts def result(key, opts = {}) AWS::Flow::Templates.send(:result, key, opts) end # Initializes a result activity template - # @param {String} key + # @param [String] key # A unique key that identifies the result of an activity execution - # @param {Hash} options + # @param [Hash] opts def self.result(key, opts = {}) ResultActivityTemplate.new(key, opts) end diff --git a/aws-flow/lib/aws/templates/result.rb b/aws-flow/lib/aws/templates/result.rb index 0eede20..fdaad1e 100644 --- a/aws-flow/lib/aws/templates/result.rb +++ b/aws-flow/lib/aws/templates/result.rb @@ -8,7 +8,7 @@ module Templates # or actual results themselves back to the user class ResultWorker - # Wrapper around a ruby {Hash} to provide synchronization around making + # Wrapper around a ruby hash to provide synchronization around making # changes to the encapsulated hash. class SynchronizedHash attr_reader :hash