Skip to content
jrkinley edited this page Apr 29, 2013 · 6 revisions

HBase Trident

Trident is a high-level abstraction for Storm that allows you to perform stateful stream processing and low latency distributed querying. Trident allows you to perform joins, aggregations, grouping, functions, and filters on streaming data in much the same way as Pig and Cascading allow you to perform these operations in Hadoop. For more information on Trident please see the Trident tutorial.

Trident also allows you to read and write to stateful sources, which can either be internal to the topology (e.g. kept in-memory), or externally stored in a database (See Trident state). The storm-hbase connector provides two generic Trident state implementations to persist state in HBase:

  • HBaseValueState: A Trident state implementation for putting and getting values from a HBase table. Because Trident processes Tuples in batch, and often the state is time critical, HBaseValueState will flush all Puts for a batch to ensure they do not get stuck in HBase's client-side write buffer - although the write buffer should be enabled so the Puts are not sent individually.

  • HBaseAggregateState: A Trident persistentAggregate implementation for updating aggregations in HBase. It expects the Trident topology to group the Tuples by the HBase table's rowKey, columnFamily, and columnQualifier to form the keys in the state, and the aggregation result will form the values (see the HBaseAggregateState example below). As above, all Puts for a batch are flushed immediately to ensure they do not get stuck in HBase's client-side write buffer.

Configuration

TridentConfig is an extension to TupleTableConfig (as described on the HBase Storm Bolts page) and, in addition to its attributes, the following Trident specific attributes can be set:

  • stateCacheSize: Sets the size of the in-memory cache that sits above HBase to minimise the communication between Storm and HBase. Default size is 1,000.
  • stateSerializer: The serializer to use for serializing/deserializing the state value in HBase. This does not normally need to be set and will be determined based on the configured state semantics - e.g. non-transactional, transactional, or opaque.

Running the example topologies

  • HBaseTridentValueTopology: A Trident topology that demonstrates how to use HBaseValueState to put streaming data into HBase.
  • HBaseTridentAggregateTopology: A Trident topology that demonstrates how to use HBaseAggregateState to compute incremental counts for keys and persist their state in HBase.

The example topologies are based on the URL shortener project (Hush), taken from the HBase: The Definitive Guide book.

Create HBase table

Create the HBase table (assumes you have HBase installed and configured):

hbase> create 'shorturl', {NAME => 'data', VERSIONS => 3},{NAME => 'daily', VERSION => 1, TTL => 604800},{NAME => 'weekly', VERSION => 1, TTL => 2678400},{NAME => 'monthly', VERSION => 1, TTL => 31536000}

Build .tar.gz

To build a tarball that includes the storm-hbase .jar and its dependencies for running the examples:

$ mvn assembly:assembly

Deploy the .tar.gz

Copy the .tar.gz to a machine that you typically use to run the HBase client (e.g. has the required hbase-site.xml) and unpack it:

$ scp target/storm-hbase-[version].tar.gz [email protected]:/home/user
$ ssh [email protected]
$ tar -xvzf storm-hbase-[version].tar.gz

Running HBaseTridentValueTopology

java -cp storm-hbase-[version]/lib/*:/path/to/hbase/conf backtype.storm.contrib.hbase.examples.HBaseTridentValueTopology

The /path/to/hbase/conf directory should contain your hbase-site.xml

With the following TridentConfig configuration:

TridentConfig config = new TridentConfig("shorturl", "shortid");
config.setBatch(false);
config.addColumn("data", "url");
config.addColumn("data", "user");
config.addColumn("data", "date");

Your 'shorturl' table should look something like this:

hbase> scan 'shorturl'
ROW                        COLUMN+CELL                                                                                                                                            
http://atmlb.com/7NG4sm    column=data:date, timestamp=1349709372291, value=20120816                                                                                              
http://atmlb.com/7NG4sm    column=data:url, timestamp=1349709372291, value=baltimore.orioles.mlb.com/                                                                             
http://atmlb.com/7NG4sm    column=data:user, timestamp=1349709372291, value=kinley                                                                                                
http://bit.ly/2VL7eA       column=data:date, timestamp=1349709372258, value=20120816                                                                                              
http://bit.ly/2VL7eA       column=data:url, timestamp=1349709372258, value=www.49ers.com/                                                                                         
http://bit.ly/2VL7eA       column=data:user, timestamp=1349709372258, value=kinley                                                                                                
http://bit.ly/9ZJhuY       column=data:date, timestamp=1349709372291, value=20120816                                                                                              
http://bit.ly/9ZJhuY       column=data:url, timestamp=1349709372291, value=www.buccaneers.com/index.html                                                                          
http://bit.ly/9ZJhuY       column=data:user, timestamp=1349709372291, value=kinley                                                                                                
http://bit.ly/LsaBa        column=data:date, timestamp=1349709372258, value=20120816                                                                                              
http://bit.ly/LsaBa        column=data:url, timestamp=1349709372258, value=www.baltimoreravens.com/                                                                               
http://bit.ly/LsaBa        column=data:user, timestamp=1349709372258, value=kinley                                                                                                
http://bit.ly/ZK6t         column=data:date, timestamp=1349709372258, value=20120816                                                                                              
http://bit.ly/ZK6t         column=data:url, timestamp=1349709372258, value=www.arsenal.com/home                                                                                   
http://bit.ly/ZK6t         column=data:user, timestamp=1349709372258, value=kinley

Running HBaseTridentAggregateTopology

The HBaseTridentAggregateTopology demonstrates how to use HBaseAggregateState with Trident's each, project, groupBy, and persistentAggregate functions to calculate daily, weekly, and monthly counters for each key in a stream and use HBase to persist their state.

StateFactory state = HBaseAggregateState.transactional(config);
TridentTopology topology = new TridentTopology();
topology
    .newStream("spout", spout)
    .each(new Fields("shortid", "date"), new DatePartitionFunction(),
        new Fields("cf", "cq"))
    .project(new Fields("shortid", "cf", "cq"))
    .groupBy(new Fields("shortid", "cf", "cq"))
    .persistentAggregate(state, new Count(), new Fields("count"));

Given the example Tuples:

shortid                    url                              user      date    
"http://bit.ly/ZK6t",      "www.arsenal.com/home",          "kinley", "20120816"
"http://bit.ly/LsaBa",     "www.baltimoreravens.com/",      "kinley", "20120816"
"http://bit.ly/2VL7eA",    "www.49ers.com/",                "kinley", "20120816"
"http://bit.ly/9ZJhuY",    "www.buccaneers.com/index.html", "kinley", "20120816"
"http://atmlb.com/7NG4sm", "baltimore.orioles.mlb.com/",    "kinley", "20120816"
"http://bit.ly/ZK6t",      "www.arsenal.com/home",          "kinley", "20120817"
"http://bit.ly/ZK6t",      "www.arsenal.com/home",          "kinley", "20120817"
"http://bit.ly/ZK6t",      "www.arsenal.com/home",          "kinley", "20120817"
"http://bit.ly/LsaBa",     "www.baltimoreravens.com/",      "kinley", "20120820"
"http://bit.ly/LsaBa",     "www.baltimoreravens.com/",      "kinley", "20120820"
"http://bit.ly/LsaBa",     "www.baltimoreravens.com/",      "kinley", "20120903"

Firstly, because we need to compute daily, weekly, and monthly counters for each key we need to explode the stream so that each period can be grouped and aggregated independently. The each function is used for this because it takes in a single Tuple and emits zero or more Tuples as output. For example, given the Tuple:

"http://bit.ly/ZK6t", "20120816"

The DatePartitionFunction will output the following three Tuples:

"http://bit.ly/ZK6t", "20120816", "daily", "20120816"
"http://bit.ly/ZK6t", "20120816", "weekly", "201233"
"http://bit.ly/ZK6t", "20120816", "monthly", "201208"

The two new fields appended to the Tuple define the column family (CF) and column qualifier (CQ) to use when updating the keys state in HBase.

Next the project function keeps only the fields that are required to compute the counter increments and update the state (e.g. "shortid", "cf", and "cq").

Next the groupBy function groups all Tuples with the same key ("shortid", "cf", and "cq") together so that the counter increment computations are accurate.

Finally the persistentAggregate function uses HBaseAggregateState and the count aggregator to compute the counter increments for each key ("shortid", "cf", and "cq") and updates their state in HBase.

To run the topology:

java -cp storm-hbase-[version]/lib/*:/path/to/hbase/conf backtype.storm.contrib.hbase.examples.HBaseTridentAggregateTopology

The /path/to/hbase/conf directory should contain your hbase-site.xml

Your 'shorturl' table should look something like this:

hbase> scan 'shorturl'
ROW                        COLUMN+CELL                                                                                                                                            
http://atmlb.com/7NG4sm    column=daily:20120816, timestamp=1349716046299, value=[2,1]                                                                                            
http://atmlb.com/7NG4sm    column=monthly:201208, timestamp=1349716046300, value=[2,1]                                                                                            
http://atmlb.com/7NG4sm    column=weekly:201233, timestamp=1349716046297, value=[2,1]                                                                                             
http://bit.ly/2VL7eA       column=daily:20120816, timestamp=1349716046274, value=[1,1]                                                                                            
http://bit.ly/2VL7eA       column=monthly:201208, timestamp=1349716046276, value=[1,1]                                                                                            
http://bit.ly/2VL7eA       column=weekly:201233, timestamp=1349716046276, value=[1,1]                                                                                             
http://bit.ly/9ZJhuY       column=daily:20120816, timestamp=1349716046300, value=[2,1]                                                                                            
http://bit.ly/9ZJhuY       column=monthly:201208, timestamp=1349716046300, value=[2,1]                                                                                            
http://bit.ly/9ZJhuY       column=weekly:201233, timestamp=1349716046301, value=[2,1]                                                                                             
http://bit.ly/LsaBa        column=daily:20120816, timestamp=1349716046277, value=[1,1]                                                                                            
http://bit.ly/LsaBa        column=daily:20120820, timestamp=1349716046345, value=[4,2]                                                                                            
http://bit.ly/LsaBa        column=daily:20120903, timestamp=1349716046346, value=[4,1]                                                                                            
http://bit.ly/LsaBa        column=monthly:201208, timestamp=1349716046354, value=[4,3]                                                                                            
http://bit.ly/LsaBa        column=monthly:201209, timestamp=1349716046353, value=[4,1]                                                                                            
http://bit.ly/LsaBa        column=weekly:201233, timestamp=1349716046276, value=[1,1]                                                                                             
http://bit.ly/LsaBa        column=weekly:201234, timestamp=1349716046353, value=[4,2]                                                                                             
http://bit.ly/LsaBa        column=weekly:201236, timestamp=1349716046355, value=[4,1]                                                                                             
http://bit.ly/ZK6t         column=daily:20120816, timestamp=1349716046278, value=[1,1]                                                                                            
http://bit.ly/ZK6t         column=daily:20120817, timestamp=1349716046319, value=[3,3]                                                                                            
http://bit.ly/ZK6t         column=monthly:201208, timestamp=1349716046318, value=[3,4]                                                                                            
http://bit.ly/ZK6t         column=weekly:201233, timestamp=1349716046329, value=[3,4]

Where the value is the serialized state of the key, which in this example, includes the latest transaction ID and the count. The transaction ID is used by Trident to guarantee exactly-once processing semantics.