Skip to content
jrkinley edited this page Apr 29, 2013 · 4 revisions

HBase Storm Bolts

The storm-hbase connector provides three Bolt implementations:

  • HBaseBolt: A Bolt for transforming Tuples into Put requests and sending them to a HBase table. Works in both single and batch mode. In single mode each Put is sent straight to HBase and therefore requires a RPC for each request. In batch mode HBase's client-side write buffer is enabled which buffers Put requests until it is full, at which point all of the Puts are flushed to HBase in a single RPC. Batch mode is enabled by default and is recommended for high throughput streams.

  • HBaseCountersBolt: A Bolt for transforming Tuples into HBase counter Increment requests. This is useful for storm topologies that collect statistics. Please note that this is a non-transactional bolt and therefore cannot achieve exactly-once processing semantics. Based on Storm's guaranteed message processing mechanism there is a chance of over-counting if Tuples fail after updating the HBase counter and before they are successfully acked in Storm, and are subsequently replayed and recounted.

  • HBaseCounterBatchBolt: A transactional version of HBaseCountersBolt for incrementing idempotent counters in HBase. This Bolt is intended for use in transactional topologies, which enable you to achieve exactly once processing semantics (https://github.com/nathanmarz/storm/wiki/Transactional-topologies). It stores the counter and the latest transaction ID (txid) together in the HBase table. The counter is only incremented if the txid in the table is different from the txid of the Tuple being processed. E.g: If the txids are different, because of Storm's strong ordering of transactions, we know that the current Tuple hasn't been represented in the counter, so the counter is incremented and its latest txid updated. If the txids are the same, we know that the current Tuple is represented in the counter so it is skipped. The Tuple must have failed after previously incrementing the counter but before reporting success back to Storm, so it was replayed.

All implementations are generic and configurable. TupleTableConfig is used to configure the Bolts with the following attributes:

  • tableName: The HBase table name to connect to
  • tupleRowKeyField: The Tuple field to be used as the row key
  • tupleTimestampField: The Tuple field to be used as the rows timestamp (optional)
  • batch: Whether to enable HBase's client-side write buffer (batch mode) (enabled by default)
  • writeBufferSize: The size of the client-side write buffer (optional) (overrides the value in hbase-site.xml, 2097152 bytes by default)
  • writeToWAL: Whether to write to HBase's edit log (WAL) (enabled by default)
  • columnFamilies: A map of column families to column qualifiers. The column qualifier names should correspond to the Tuple field names to be put into the table

Qualifying Counters

By default HBaseCountersBolt and HBaseCountersBatchBolt use the Tuple fields value as the counters column qualifier name. For example, given the following Tuples:

Tuple1 = shorturl:http://bit.ly/LsaBa, date:20120816
Tuple2 = shorturl:http://bit.ly/LsaBa, date:20120816
Tuple3 = shorturl:http://bit.ly/LsaBa, date:20120816

And the TupleTableConfig configuration:

tupleRowKeyField = "shorturl"
columnFamilies = {CF:"data", CQ:"date"}

You will get the following counter in your HBase table:

ROW                    CF      CQ          COUNTER
http://bit.ly/LsaBa    data    20120816    3

Alternatively, if you specify a CQ in TupleTableConfig that doesn't exist in the Tuple, for example "clicks", the given CQ name is used for the counter:

ROW                    CF      CQ        COUNTER
http://bit.ly/LsaBa    data    clicks    3

Running the example topologies

  • HBaseExampleTopology: A non-transactional topology that demonstrates how to use HBaseBolt to put streaming data into HBase.
  • HBaseCountersTopology: A non-transactional topology that demonstrates how to use HBaseCountersBolt to increment counters in HBase.
  • HBaseCountersBatchTopology: A transactional topology that demonstrates how to use HBaseCountersBatchBolt to increment idempotent counters in HBase.

The example topologies are based on the URL shortener project (Hush), taken from the HBase: The Definitive Guide book.

Example data

The HBaseExampleTopology and HBaseCountersExampleTopology topologies use the same Spout (TestSpout), which outputs at random one of the following Tuples:

shorturl                     url                                name        date
"http://bit.ly/ZK6t"         "www.arsenal.com/home"             "kinley"    "20120816"
"http://bit.ly/LsaBa"        "www.baltimoreravens.com/"         "kinley"    "20120816"
"http://bit.ly/2VL7eA"       "www.49ers.com/"                   "kinley"    "20120816"
"http://bit.ly/9ZJhuY"       "www.buccaneers.com/index.html"    "kinley"    "20120816"
"http://atmlb.com/7NG4sm"    "baltimore.orioles.mlb.com/"       "kinley"    "20120816"

The HBaseCountersBatchTopology uses the backtype.storm.testing.MemoryTransactionalSpout to output batches of the Tuples above.

Create HBase table

Create the HBase table (assumes you have HBase installed and configured):

hbase> create 'shorturl', {NAME => 'data', VERSIONS => 3}, {NAME => 'daily', VERSION => 1, TTL => 604800}, {NAME => 'weekly', VERSION => 1, TTL => 2678400}, {NAME => 'monthly', VERSION => 1, TTL => 31536000}

Build .tar.gz

To build a tarball that includes the storm-hbase .jar and its dependencies for running the examples:

$ mvn assembly:assembly

Deploy the .tar.gz

Copy the .tar.gz to a machine that you typically use to run the HBase client (e.g. has the required hbase-site.xml) and unpack it:

$ scp target/storm-hbase-[version].tar.gz [email protected]:/home/user
$ ssh [email protected]
$ tar -xvzf storm-hbase-[version].tar.gz

Running HBaseExampleTopology

java -cp storm-hbase-[version]/lib/*:/path/to/hbase/conf backtype.storm.contrib.hbase.examples.HBaseExampleTopology

The /path/to/hbase/conf directory should contain your hbase-site.xml

With the following TupleTableConfig configuration:

TupleTableConfig config = new TupleTableConfig("shorturl", "shortid");
config.setBatch(false);
config.addColumn("data", "url");
config.addColumn("data", "user");
config.addColumn("data", "date");

Your 'shorturl' table should look something like this:

hbase> scan 'shorturl'
ROW                        COLUMN+CELL
http://atmlb.com/7NG4sm    column=data:date, timestamp=1345115796927, value=20120816
http://atmlb.com/7NG4sm    column=data:url, timestamp=1345115796927, value=baltimore.orioles.mlb.com/
http://atmlb.com/7NG4sm    column=data:user, timestamp=1345115796927, value=kinley
http://bit.ly/2VL7eA       column=data:date, timestamp=1345115796935, value=20120816
http://bit.ly/2VL7eA       column=data:url, timestamp=1345115796935, value=www.49ers.com/
http://bit.ly/2VL7eA       column=data:user, timestamp=1345115796935, value=kinley
http://bit.ly/9ZJhuY       column=data:date, timestamp=1345115796937, value=20120816
http://bit.ly/9ZJhuY       column=data:url, timestamp=1345115796937, value=www.buccaneers.com/index.html
http://bit.ly/9ZJhuY       column=data:user, timestamp=1345115796937, value=kinley
http://bit.ly/LsaBa        column=data:date, timestamp=1345115796929, value=20120816
http://bit.ly/LsaBa        column=data:url, timestamp=1345115796929, value=www.baltimoreravens.com/
http://bit.ly/LsaBa        column=data:user, timestamp=1345115796929, value=kinley
http://bit.ly/ZK6t         column=data:date, timestamp=1345115796930, value=20120816
http://bit.ly/ZK6t         column=data:url, timestamp=1345115796930, value=www.arsenal.com/home
http://bit.ly/ZK6t         column=data:user, timestamp=1345115796930, value=kinley

Running HBaseCountersTopology

java -cp storm-hbase-[version]/lib/*:/path/to/hbase/conf backtype.storm.contrib.hbase.examples.HBaseCountersTopology

The /path/to/hbase/conf directory should contain your hbase-site.xml

With the following TupleTableConfig configuration:

TupleTableConfig config = new TupleTableConfig("shorturl", "shortid");
config.setBatch(false);
config.addColumn("data", "clicks");
config.addColumn("daily", "date");

Your 'shorturl' table should look something like this:

hbase> scan 'shorturl'
ROW                        COLUMN+CELL
http://atmlb.com/7NG4sm    column=daily:20120816, timestamp=1345115849682, value=\x00\x00\x00\x00\x00\x00\x05\x8C
http://atmlb.com/7NG4sm    column=data:clicks, timestamp=1345115849682, value=\x00\x00\x00\x00\x00\x00\x05\x8C
http://bit.ly/2VL7eA       column=daily:20120816, timestamp=1345115849679, value=\x00\x00\x00\x00\x00\x00\x05\xA1
http://bit.ly/2VL7eA       column=data:clicks, timestamp=1345115849679, value=\x00\x00\x00\x00\x00\x00\x05\xA1
http://bit.ly/9ZJhuY       column=daily:20120816, timestamp=1345115849665, value=\x00\x00\x00\x00\x00\x00\x05\xFC
http://bit.ly/9ZJhuY       column=data:clicks, timestamp=1345115849665, value=\x00\x00\x00\x00\x00\x00\x05\xFC
http://bit.ly/LsaBa        column=daily:20120816, timestamp=1345115849674, value=\x00\x00\x00\x00\x00\x00\x05\x94
http://bit.ly/LsaBa        column=data:clicks, timestamp=1345115849674, value=\x00\x00\x00\x00\x00\x00\x05\x94
http://bit.ly/ZK6t         column=daily:20120816, timestamp=1345115849678, value=\x00\x00\x00\x00\x00\x00\x05v
http://bit.ly/ZK6t         column=data:clicks, timestamp=1345115849678, value=\x00\x00\x00\x00\x00\x00\x05v

To see the counter value, run the following in the HBase shell:

$ get_counter 'shorturl', 'http://atmlb.com/7NG4sm', 'daily:20120816'
COUNTER VALUE = 1420

$ get_counter 'shorturl', 'http://atmlb.com/7NG4sm', 'data:clicks'
COUNTER VALUE = 1420

Running HBaseCountersBatchTopology

java -cp storm-hbase-[version]/lib/*:/path/to/hbase/conf backtype.storm.contrib.hbase.examples.HBaseCountersBatchTopology

The /path/to/hbase/conf directory should contain your hbase-site.xml

With the following TupleTableConfig configuration:

TupleTableConfig config = new TupleTableConfig("shorturl", "shortid");
config.setBatch(false);
config.addColumn("data", "clicks");
config.addColumn("daily", "date");

Your 'shorturl' table should look something like this:

hbase> scan 'shorturl'
ROW                        COLUMN+CELL
http://atmlb.com/7NG4sm    column=daily:20120816, timestamp=1345935584520, value=\x00\x00\x00\x00\x00\x00\x00\x01
http://atmlb.com/7NG4sm    column=daily:20120816_txid, timestamp=1345935584523, value=\x02
http://atmlb.com/7NG4sm    column=data:clicks, timestamp=1345935584528, value=\x00\x00\x00\x00\x00\x00\x00\x01
http://atmlb.com/7NG4sm    column=data:clicks_txid, timestamp=1345935584531, value=\x02
http://bit.ly/2VL7eA       column=daily:20120816, timestamp=1345935584434, value=\x00\x00\x00\x00\x00\x00\x00\x01
http://bit.ly/2VL7eA       column=daily:20120816_txid, timestamp=1345935584439, value=\x01
http://bit.ly/2VL7eA       column=data:clicks, timestamp=1345935584444, value=\x00\x00\x00\x00\x00\x00\x00\x01
http://bit.ly/2VL7eA       column=data:clicks_txid, timestamp=1345935584446, value=\x01
http://bit.ly/9ZJhuY       column=daily:20120816, timestamp=1345935584535, value=\x00\x00\x00\x00\x00\x00\x00\x01
http://bit.ly/9ZJhuY       column=daily:20120816_txid, timestamp=1345935584538, value=\x02
http://bit.ly/9ZJhuY       column=data:clicks, timestamp=1345935584542, value=\x00\x00\x00\x00\x00\x00\x00\x01
http://bit.ly/9ZJhuY       column=data:clicks_txid, timestamp=1345935584544, value=\x02
http://bit.ly/LsaBa        column=daily:20120816, timestamp=1345935584450, value=\x00\x00\x00\x00\x00\x00\x00\x04
http://bit.ly/LsaBa        column=daily:20120816_txid, timestamp=1345935584452, value=\x01
http://bit.ly/LsaBa        column=data:clicks, timestamp=1345935584456, value=\x00\x00\x00\x00\x00\x00\x00\x04
http://bit.ly/LsaBa        column=data:clicks_txid, timestamp=1345935584458, value=\x01
http://bit.ly/ZK6t         column=daily:20120816, timestamp=1345935584462, value=\x00\x00\x00\x00\x00\x00\x00\x04
http://bit.ly/ZK6t         column=daily:20120816_txid, timestamp=1345935584465, value=\x01
http://bit.ly/ZK6t         column=data:clicks, timestamp=1345935584469, value=\x00\x00\x00\x00\x00\x00\x00\x04
http://bit.ly/ZK6t         column=data:clicks_txid, timestamp=1345935584471, value=\x01