HBase is both an awe-inspiring and an intimidating technology. I have a love-hate relationship with it. I like the extensibility and flexibility, but its data model is complex, and using its extension mechanisms comes at a cost. Not to mention operational pitfalls. However, when you need to access a huge amount of data in (soft) real-time (and are familiar with Hadoop), it could be the tool for you. Especially if you want store partially aggregated data and need to do some further processing on request, as is often the case with analytical use cases.
To give you a concrete example of how to go on about implementing such a system on top of HBase, I have built a bare-bones and very-much-not-production-ready data processing framework that uses HBase filters and coprocessors. The source code, together with a few tests that illustrate how to use the framework are published on Github.
HBase architecture crash course
Take a deep breath there is a lot to cover.
HBase is a distributed NoSQL database built on top of Hadoop. The basic block of the HBase data model is a cell, which is an addressable blob of binary data. HBase tables are comprised of rows that are identified by a binary rowkey. Each rows can have a variable number of columns that hold individual cells. A column is identified by its column family and column qualifier. Column families need to be defined upfront. Column families are stored in separate files, which allows for efficient pruning when selecting just a subset of them. Column qualifiers is an arbitrary binary identifier. You can define them on the fly and each row can have a different set of qualifiers.
When iterating over columns in a row, the columns are sorted lexicographically, first by column family and then by qualifier.
|
|
The fact that rowkeys, column qualifiers, and cell data can be arbitrary bytes gives a lot of flexibility when it comes to schema design and data formats used.
Rows in each table are lexicographically sorted by rowkey and split into rowkey ranges. Each such range is called a region. The regions are distributed onto multiple region servers. Each of the region servers usually holds several regions, and it is not unusual that one region server hosts multiple regions of a particular table.
Still with me? Great! There is actually a lot of important details I glossed over (cell versioning, how the data is stored within a region, MVCC, …), but this should encompass everything needed to understand the rest of this article.
Reading data from HBase
Getting data from HBase is relatively cumbersome. There is no native query language.
The basic way how to access the data is to perform a Scan
.
There is a lot of attributes that you can use to restrict the Scan to a subset of your data. it usually goes like this:
- You specify a range of keys
- then select a set of column families and/or column qualifier
- initialize the scan and receive results row by row
You get a ResultScanner
,
which implements Iterable<Result>
, where Result
contains all the cells from specified columns from a single row. In code:
|
|
Notice that the API is pretty close to the data model, and it is not very expressive. For example, what if you only wanted rows whose rowkey starts with a byte that is an even number?
Well, you have do have an iterator, you can go ahead, iterate over all rows and just throw away the ones with rowkeys you don’t want. However, this approach is wasteful - it results in a lot of unnecessary network IO, which comes with increased latency.
What you might want is to push the filtering to HBase region servers. That would reduce the amount of data sent to the client. Filter pushdown is a Big Data evergreen that can significantly reduce latency.
Thankfully, doing advanced filtering is possible by setting
an HBase Filter
using the Scan#setFilter
method.
The Filter
interface contains multiple methods that can influence which Results
will be returned to the client.
For example, the following Filter
returns only rows with rowkeys starting with
an even byte:
|
|
There is a lot going on, considering how simple the filtering logic is.
Remember that our filter needs to be serialized, sent off together
with the Scan
specification to region servers, and then deserialized.
That is what the methods toByteArray
and parseFrom
are for - HBase
needs to know how to serialize and deserialize the parameters of our filter.
The implementation for EvenFirstRowkeyByteFilter
is trivial since
it does not have any parameters
Before implementing a filter, check if HBase does not have you covered with already included filters.
Knowing how to implement a custom Filter
is nice and all, but it is sooo much work!
Us, developers, we are spoiled rotten by our shiny
Java streams, Scala collections, and even Spark, where we can simply write something
like filter(rowkey -> rowkey[0] % 2 == 0)
and go get a cup of coffee.
Enter…
(filter)MapReduce!
It is a framework that provides a Dataset
class. It wraps a scan
and provides a higher-level API, which supports various
filter and map operations, which get translated into lower-level
filters that are pushed to region servers.
Given an HBase table
and a scan
object, the following code
creates a ResultScanner
with only rows, whose rowkeys start with even bytes.
|
|
Check the javadoc of the Dataset
class for all implemented methods.
You can:
- filter whole rows based on rowkey with a
byte[] -> boolean
function - filter individual cells (using
filterCells
method) with aCell -> boolean
function mapRows
using aList<Cell> -> List<Cell>
functionmapCells
with aCell -> Cell
functionmapCellValues
with abyte[] -> byte[]
function
If you’d like to get your hands dirty and better understand how it all
works, try to implement a filterByCellValue
method
(accepting a List<Cell> -> boolean
function), that can be translated
into a filterCells
call. And if you like you like to jump at the deep end,
try to do implement a filterRows
method that takes a List<Cell> -> boolean
function
(you will need to implement an underlying HBase filter, more on that later).
Shakespeare test suite
To showcase the Dataset
API and provide a playground to try it out,
I have created a test suite that uses an HBase mini-cluster,
which has a table populated with the complete works of Shakespeare.
Each row contains just one cell, which holds a serialized JSON record that represents a single line of text.
|
|
The rowkey is composed of 4 concatenated
integer {play ID}{act}{scene}{line number}
. The hierarchical rowkey design
allows scanning a single play, act, or scene by simply setting the appropriate
start and stop rowkeys.
We have a data processing framework and a corpus of text. Do you know what’s coming? You guessed right! A word count example!
Shakespearean word count
Our simple word count is going to:
- Extract the
text
field from the record - Tokenize the text into a
Map<String, Integer>
holding the counts of words - Reduce the individual word counts together into a one final
Map<String, Integer>
We could push the first 2 steps to region servers using two calls to mapCellValues
and do the 3rd step on the client.
However, that would result in a lot of redundant data sent over the wire
and it could overwhelm the client (what if the corpus was the whole of Project Gutenberg?)
Could we push the reduce step (at least partially) to region servers?
I am sure you can guess the answer - if we could not, the framework would be called just (filter)Map.
The Dataset
class has two methods that we can use - reduceCells
and reduceCellValues
.
The reduce operation (let’s take reduceCells
for concreteness) has two stages.
The first stage is executed on region servers (in parallel). It takes an initial
accumulator value of type A
and a function (A, Cell) -> A
that reduces all the data requested from a given region server into one value
of type A
.
The second stage is executed on the client. It accepts an initial accumulator value
of type B
and a function (B, A) -> B
that merges the partial results
from all the region servers into one final value.
A reduceRows
method that would take a
(A, List<Cell>) -> A
function as a reducer could also be implemented.
Using our newfound reducing powers, we can do a true distributed word count!
I omit the implementation of helper functions to make it at least a bit easier
on the eyes. Also, think of WordCount
as a type alias of Map<String, Interger>
.
(I know, Java 8 does not really have those, but bear with me).
|
|
Take a look at ShakespeareDatasetSuiteElement for more (runnable) examples.
The test suite spits out a very, very naive text analysis report when run:
|
|
Under the hood
Let’s dive into how (filter)MapReduce is implemented. I’ll start the skeleton in the closet.
Function serialization
The lambda functions given to various map and filter function need to be serializable, because they are supposed to be sent over the wire to HBase region servers.
The type signatures do get a little hairy. I tried to make the code a bit more readable using helper types.
|
|
The serialization itself is the most ill-advised part of the whole implementation. It uses vanilla Java serialization facilities without any regard to safety or binary compatibility on different JVMs. You have been warned.
Filtering
Filter operations are the most straightforward - it is what the Filter
interface was
primarily intended to do. Let’s look at RowkeyPredicateFilter
which is a generalization
of the EvenFirstRowkeyByteFilter
example:
|
|
The other predicate-based filter, CellPredicateFilter
, is implemented using
the filterCell
method, which allows to filter individual cells. The implementation
is very similar.
Note: I will leave the toByteArray
and parseFrom
methods from following Filter
examples.
Mapping with Filters
HBase filters can be used to transform rows and cells as well.
Cell transformation is supported by the Filter#transformCell
method, which is a straightforward Cell -> Cell
transformation.
An example of a built-in filter using it is the KeyOnlyFilter
,
which discards the cells contents. It reduces network IO in cases where the value
do not matter.
Transformation of whole rows is a bit more hacky - it uses the filterRowCells
method, which gives you the “chance to alter the list of Cells to be submitted”.
It takes a List<Cell>
representing the whole row. The method itself
does not have a return value, but nobody
said anything about tampering with the list itself.
|
|
Filter composition
The usability of filters would be quite limited if there was not a way
to compose them. HBase provides a FilterList
class. It has two modes - MUST_PASS_ALL
and MUST_PASS_ONE
,
which roughly correspond to logical AND
and OR
. You can use multiple nested
FilterLists
to build a whole tree of filters to represent very complex
logic.
In my experience, using a single FilterList
with MUST_PASS_ALL
is relatively
intuitive. However, when you have multiple nested FiltersLists
, some of which
are MUST_PASS_ONE
, things can get rather headache-y fast. Especially when you
sprinkle in a few filters that transform the cells or whole rows.
You have to really understand the order of calls to Filter
methods during scanning.
I would advise staying away even if you feel like an HBase Gandalf 🧙.
Pushing reduce server-side
HBase filters allow you to hook into native scanning operation to run your own logic. However, you are still limited to a basic blueprint of a scan: read -> filter/transform -> return to the client.
Endpoint coprocessors allow you to implement your own communication protocol with region servers. They run directly on region servers - one instance per region.
An endpoint coprocessor Hello World
would be an endpoint that counts rows.
It would return the number of rows in a region when called.
Sending requests to coprocessors is done using the HBase client library.
The client is then responsible to handle responses from individual regions.
In this example, it could process partial counts from all the region
into a grand total of all the rows from all the regions.
Such a row count endpoint would be a specific example of a more general type of aggregating endpoint coprocessor, which computes a partial aggregate for each region and sends it off to the client. The benefits of such an aggregating endpoint are to reduce network IO and also parallelize the computation across all the regions.
The server-side reduce functionality of (filter)MapReduce is implemented exactly as such a general aggregating coprocessor. Let’s look at it in more detail. It has a server-side stage that does partial reduction and a client-side stage which merges the partial reduce results together.
We will need three ingredients for the server-side part - an initial value, reduce function, and a scan that specifies the data that should be read.
HBase coprocessor RPC protocols are defined defined as protocol buffer services.
This is the definition of our ReducerService
|
|
As you can see, the ReducerRequest
contains all three aforementioned
things in serialized form and the response is just a single serialized value.
The service definition needs to be compiled using protoc
. The result is
an abstract class which we will extend to provide the actual implementation.
The RegionCoprocessor
interface also needs to be implemented so that HBase
knows how to initialize and teardown our custom endpoint.
|
|
To call the coprocessor from the client, use the Table#coprocessorService
or Table#batchCoprocessorService
method. A common practice is to wrap the raw HBase API with a client class
to improve usability (see the ReducerClient
class as an example).
I’m not going to lie, implementing a custom HBase endpoint coprocessor is hard. To do anything interesting, you need to have a pretty good understanding of how HBase regions work and have to deal with a few APIs that are not that pleasant to work with. Even better, if you mess up, you can compromise the stability of your cluster. Either by running resource intensive code and thus burdening servers or even crashing them with an unhandled exception.
Tying it all together
Now that we have all our filters and also a ReducerService
with a client implementation,
we can wrap it in a higher-level API. The Dataset
class handles filter initialization, their composition, and delegates reduce
calls
to the ReducerClient
.
It could do more. For example, it could implement something akin to Apache Spark’s
Encoder
framework that would automatically convert cell value to/from byte[]
.
That would allow the mapCellValues
method for example to take a generic A -> B
lambda function for types that have encoders instead of byte[] -> byte[]
.
But that is again just wrapping lower-level code with a more convenient API. The filters and coprocessor underneath is the real meat and potatoes of (filter)MapReduce.
Note on schema design
As you can guess, this framework is only useful for tables designed in such a way that all the cells (or all the cells in a column family or all the cells in certain columns) hold data encoded in the same format.
It does not do justice to the flexibility that HBase provides. Each column could just as well hold a different binary format - bson, protocol buffers - and throw in some Avro for good measure!
A uniform table design used here is not that unusual. For example, Apache Phoenix - a SQL engine built on top of HBase packs all the columns in a row into a single cell instead of mapping SQL schema columns to HBase columns. They did use the latter approach but switched to the former as a storage size and performance optimization. If interested, see Phoenix documentation for details.
HBase as a platform
You can use a combination of filters and endpoint coprocessors for data processing and observer coprocessors (more on them maybe in a future post) to implement a distributed system backed by a CP store.
Many open-source projects do just that. To mention a few:
- Apache Phoenix - distributed OLTP database
- Apache Kylin - OLAP database
- OpenTSDB - time-series database
- Titan - graph database with HBase as one of its storage backends
I also recommend the blog of Robert Yokota. His hobby seems to be building databases on top of other databases. His HBase work includes HGraphDB (graph database) and HDocDB (document database). He also likes to do similar things for Apache Kafka - he has built a Kafka-backed KV store and on top of it an SQL layer, graph database, document database and even an etcd compatible metastore.
Wrapping up
Hopefully, this is needless to say, but do not use (filter)MapReduce or any of its parts in production. The API is janky - you are forced to work with raw bytes, and everything is littered with serialization and deserialization code. It is also a huge security risk. I did not put any thought into the lambda serialization implementation - it is literally the first version that kind of worked.
Moreover, the whole idea of a framework like this is to remotely execute arbitrary code on a database cluster holding your precious data - do you really want to do that?
However, you could take the concepts presented here and basically “inline” all the lambda functions to implement your own specialized filters and coprocessors that solve your specific problems and are optimized for your data.
I believe that stretching ideas to their limits is a great way to explore them. It can inspire you to build great things and (hopefully) to stay away from the most outlandish ideas when building something that serves your customers.