Skip to content
This repository was archived by the owner on Jan 10, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 111 additions & 44 deletions aws-flow/lib/aws/decider/data_converter.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#--
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Copyright (C) 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
Expand All @@ -15,14 +15,21 @@

module AWS
module Flow
# Converts an object to YAML. Exceptions are handled differently because YAML doesn't propagate backtraces
# properly, and they are very handy for debugging.
# Serializes/deserializes Ruby objects using {http://yaml.org/ YAML} format.
# *This is the default data converter for the AWS Flow Framework for Ruby*.
#
# @note There is a 32K (32,768) character limit on activity/workflow input
# and output. If the amount of data that you need to pass exceeds this
# limit, use {S3DataConverter} instead.
#
class YAMLDataConverter

# Serializes a ruby object into a YAML string.
# Serializes a Ruby object into a YAML string.
#
# @param object [Object]
# The object to serialize.
#
# @param object
# The object that needs to be serialized into a string.
# @return the object's data in YAML format.
#
def dump(object)
if object.is_a? Exception
Expand All @@ -31,10 +38,12 @@ def dump(object)
object.to_yaml
end

# Deserializes a YAML string into a ruby object.
# Deserializes a YAML string into a Ruby object.
#
# @param source
# The source YAML string that needs to be deserialized into a ruby object.
# @param source [String]
# The YAML string to deserialize.
#
# @return a Ruby object generated from the YAML string.
#
def load(source)
return nil if source.nil?
Expand All @@ -51,37 +60,56 @@ def load(source)
end
end

# S3DataConverter uses YAMLDataConverter internally to serialize and
# deserialize ruby objects. Additionally it stores objects larger than
# 32k characeters in AWS S3 and returns a serialized s3 link to be
# deserialized remotely. It caches objects locally to minimize calls to S3.
# S3DataConverter serializes/deserializes Ruby objects using
# {YAMLDataConverter}, storing the serialized data on Amazon S3. This data
# can exceed 32K (32,768) characters in size, providing a way to work past
# Amazon SWF's [input/output data limit][limits].
#
# [limits]: http://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dg-limits.html
#
# To activate it, set the `AWS_SWF_BUCKET_NAME` environment variable to the
# name of an Amazon S3 bucket to use to store workflow/activity data. The
# bucket will be created if necessary.
#
# AWS Flow Framework for Ruby doesn't delete files from S3 to prevent loss
# of data. It is recommended that users use Object Lifecycle Management in
# AWS S3 to auto delete files.
# S3DataConverter caches data on the local system. The cached version of the
# file's data is used if it is found. Otherwise, the file is downloaded from
# Amazon S3 and then deserialized to a Ruby object.
#
# `S3DataConverter` serializes Ruby objects using {YAMLDataConverter} and
# stores them in the Amazon S3 bucket specified by `AWS_SWF_BUCKET_NAME`,
# using a randomly-generated filename to identify the object's data.
#
# @note The AWS Flow Framework for Ruby doesn't delete files from S3 to
# prevent loss of data. It is recommended that you use [Object
# Lifecycle Management][olm] in Amazon S3 to automatically delete files
# after a certain period.
#
# [olm]: http://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
#
# More information about object expiration can be found at:
# http://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectExpiration.html
class S3DataConverter

require 'lru_redux'

# S3Cache is a wrapper around the LruRedux cache.
# @api private
class S3Cache
attr_reader :cache

MAX_SIZE = 1000

# @api private
def initialize
@cache = LruRedux::ThreadSafeCache.new(MAX_SIZE)
end

# Cache lookup
# @api private
def [](key)
@cache[key]
end

# Cache entry
# @api private
def []=(key, value)
@cache[key] = value
end
Expand All @@ -108,6 +136,17 @@ def converter

end

# Initialize a new S3DataConverter, providing it the name of an Amazon S3
# bucket that will be used to store serialized Ruby objects. The bucket
# will be created if it doesn't already exist.
#
# @note The default data converter specified by
# {FlowConstants.default_data_converter} will be used to serialize
# and deserialize the data. Ordinarily, this is {YAMLDataConverter}.
#
# @param bucket [String]
# The Amazon S3 bucket name to use for serialized data storage.
#
def initialize(bucket)
@bucket = bucket
@cache = S3Cache.new
Expand All @@ -116,17 +155,29 @@ def initialize(bucket)
@converter = FlowConstants.default_data_converter
end

# Serializes a ruby object into a string. If the size of the converted
# string is greater than 32k characters, the string is uploaded to an
# AWS S3 file and a serialized hash containing the filename is returned
# instead. The filename is generated at random in the following format -
# rubyflow_data_<UUID>.
# Serializes a Ruby object into a string (by default, YAML). The resulting
# data, if > 32,768 (32K) characters, is uploaded to the bucket specified
# when the `S3DataConverter` was initialized and is copied to the local
# cache.
#
# @param [Object] object
# The object to be serialized into a string. By default, the framework
# serializes the object into a YAML string using {YAMLDataConverter}.
#
# @return [String]
# The file's serialized data if the resulting data is < 32,768 (32K)
# characters.
#
# If the resulting data is > 32K, then the file is uploaded to S3 and
# a YAML string is returned that represents a hash of the following
# form:
#
# { s3_filename: <filename> }
#
# The returned *filename* is randomly-generated, and follows the form:
#
# The format of the returned serialized hash is - { s3_filename: <filename> }
# rubyflow_data_<UUID>
#
# @param object
# The object that needs to be serialized into a string. By default it
# serializes the object into a YAML string.
#
def dump(object)
string = @converter.dump(object)
Expand All @@ -138,15 +189,20 @@ def dump(object)
ret
end

# Deserializes a string into a ruby object. If the deserialized
# string is a ruby hash of the format { s3_filename: <filename> }, then
# it will first look for the file in a local cache. In case of a cache miss,
# it will try to download the file from AWS S3, deserialize the contents
# of the file and return the new object.
# Deserializes a string into a Ruby object. If the deserialized string is
# a Ruby hash of the format: *{ s3_filename: <filename\> }*, then the
# local cache is searched for the file's data. If the file is not cached,
# then the file is downloaded from Amazon S3, deserialized, and its data
# is copied to the cache.
#
# @param [String] source
# The source string that needs to be deserialized into a Ruby object.
# By default it expects the source to be a YAML string that was
# serialized using {#dump}.
#
# @return [Object] A Ruby object created by deserializing the YAML source
# string.
#
# @param source
# The source that needs to be deserialized into a ruby object. By
# default it expects the source to be a YAML string. #
def load(source)
object = @converter.load(source)
ret = object
Expand All @@ -156,11 +212,15 @@ def load(source)
ret
end

# Helper method to write a string to an s3 file. A random filename is
# generated of the format - rubyflow_data_<UUID>
# Helper method to write a string to an Amazon S3 file.
#
# @param [String] string
# The string to be uploaded to Amazon S3. The file's data is uploaded to
# the bucket specified when the `S3DataConverter` was initialized, and
# is also copied into the cache.
#
# @param string
# The string to be uploaded to S3
# @return [String]
# The randomly-generated filename of the form: *rubyflow_data_<UUID>*.
#
# @api private
def put_to_s3(string)
Expand All @@ -171,9 +231,15 @@ def put_to_s3(string)
return filename
end

# Helper method to read an s3 file
# Helper method to read an Amazon S3 file.
#
# @param s3_filename
# File name to be deleted
# The file name on Amazon S3 to be read. If the file's data exists in
# the cache, the cached version is returned. Otherwise, the file is
# retrieved from the S3 bucket that was specified when `S3DataConverter`
# was initialized, and the file's data is added to the cache.
#
# @return the data in the file.
#
# @api private
def get_from_s3(s3_filename)
Expand All @@ -184,14 +250,15 @@ def get_from_s3(s3_filename)
ret = s3_object.read
@cache[s3_filename] = ret
rescue AWS::S3::Errors::NoSuchKey => e
raise "Could not find key #{s3_filename} in bucket #{@bucket} on S3. #{e}"
raise "Could not find key #{s3_filename} in bucket #{@bucket} on Amazon S3. #{e}"
end
return ret
end

# Helper method to delete an s3 file
# Helper method to delete an Amazon S3 file
#
# @param s3_filename
# File name to be deleted
# The file name on S3 to be deleted
#
# @api private
def delete_from_s3(s3_filename)
Expand Down
52 changes: 27 additions & 25 deletions aws-flow/lib/aws/decider/starter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ module Flow
#
# @option opts [Array] *Optional* :tag_list
#
# @option opts *Optional* :data_converter
# @option opts [] *Optional* :data_converter
#
# Usage -
#
Expand Down Expand Up @@ -138,7 +138,7 @@ def self.start_workflow(workflow = nil, input, opts)
# Starts an Activity or a Workflow Template execution using the
# default workflow class FlowDefaultWorkflowRuby
#
# @param [String or AWS::Flow::Templates::TemplateBase] name_or_klass
# @param [String, AWS::Flow::Templates::TemplateBase] name_or_klass
# The Activity or the Workflow Template that needs to be scheduled via
# the default workflow. This argument can either be a string that
# represents a fully qualified activity name - <ActivityClass>.<method_name>
Expand All @@ -147,55 +147,57 @@ def self.start_workflow(workflow = nil, input, opts)
# @param [Hash] input
# Input hash for the workflow execution
#
# @param [Hash] opts
# @param [Hash] options
# Additional options to configure the workflow or activity execution.
#
# @option opts [true, false] :get_result
# @option options [true, false] :get_result
# *Optional* This boolean flag can be set to true if the result future
# if required. The future can be waited on by using the
# AWS::Flow::wait_for_all, AWS::Flow::wait_for_any methods or by
# calling the ExternalFuture#get method. Default value is false.
#
# @option opts [Hash] :exponential_retry
# @option options [Hash] :exponential_retry
# A hash of {AWS::Flow::ExponentialRetryOptions}. Default value is -
# { maximum_attempts: 3 }
#
# @option opts [String] *Optional* :domain
# @option options [String] *Optional* :domain
# Default value is FlowDefault
#
# @option opts [Integer] *Optional* :execution_start_to_close_timeout
# @option options [Integer] *Optional* :execution_start_to_close_timeout
# Default value is 3600 seconds (1 hour)
#
# @option opts [String] *Optional* :workflow_id
# @option options [String] *Optional* :workflow_id
#
# @option opts [Integer] *Optional* :task_priority
# @option options [Integer] *Optional* :task_priority
# Default value is 0
#
# @option opts [String] *Optional* :tag_list
# @option options [String] *Optional* :tag_list
# By default, the name of the activity task gets added to the workflow's
# tag_list
#
# @option opts *Optional* :data_converter
# Default value is {AWS::Flow::YAMLDataConverter}. To use the
# {AWS::Flow::S3DataConverter}, set the AWS_SWF_BUCKET_NAME environment
# variable name with a valid AWS S3 bucket name.
# @option options [YAMLDataConverter, S3DataConverter] *Optional* :data_converter
# The default value is {AWS::Flow::YAMLDataConverter}.
#
# @option opts *Optional* A hash of {AWS::Flow::ActivityOptions}
# To use {AWS::Flow::S3DataConverter}, set the environment variable
# `AWS_SWF_BUCKET_NAME` with a valid Amazon S3 bucket name.
#
# Usage -
# @option options [Hash] *Optional* A hash of {AWS::Flow::ActivityOptions}
#
# Usage:
#
# AWS::Flow::start("<ActivityClassName>.<method_name>", <input_hash>,
# <options_hash> )
#
# Examples:
#
# AWS::Flow::start("<ActivityClassName>.<method_name>", <input_hash>,
# <options_hash> )
# * Start an activity execution:
#
# Example -
# AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" })
#
# 1) Start an activity execution -
# AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" })
# * Start an activity execution with overridden options:
#
# 2) Start an activity execution with overriden options -
# AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" }, {
# exponential_retry: { maximum_attempts: 10 } }
# )
# AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" },
# { exponential_retry: { maximum_attempts: 10 } } )
#
def self.start(name_or_klass, input, options = {})
AWS::Flow::Templates::Starter.start(name_or_klass, input, options)
Expand Down
Loading