December 12, 2016

HDFS vs HBase in PySpark 2.0

It’s been a challenging period at Tribe. In case you don’t know, I am in charge of selecting and implementing the architecture for an ad Exchange with a focus on latency and performance.

As fun as it sounds, it is incredibly challenging, every decision in terms of tools not only affects the current performance, but given the sheer amount of data creates technical debt instantly (data migrations are always a pain, but when dealing with big data they are a big pain, hehe).

One of the decissions I had to take was how to store the final data. This decission is probably one of the most critical, as the structure and platform chosen will define how the data is read and how the data is written.

Interlude - AdTech stuff

Our platform receives a user that has clicked on an [Smartlink]() ad. Do you know those ads that when you click on them, they start redirecting to multiple urls? those are called smartlinks in the biz. What happens is basically your browser is bringing you to diferent Ad Agencies that are deciding whether to redirect you to one of their clients (advertisers), or to pass you along to another Ad Agency.

When the user lands on the final advertiser, and hopefully converts (conversion meaning the user has perform the action the advertiser is willing to pay for, whether is an app installation, or a purchase, or a subscription), the advertiser sends a postback (an HTTP post request to a prearranged endpoint) to the agency, so the agency can write it down and then charge the advertiser at the end of the month.

Back to DataLand

This basically means that when we receive a user, we create an opportunity object. This opportunity looks like

{
"created_at": 14805156161,
"event_id": "aoadwomawodi-awdawdd-awda",
"user_agent": "Internet Explorer 8",
"ip": "192.168.1.1",
"device":{
  "os": "Android",
  "model": "Samsung"
   },
}

Afterwards, once (if) we receive a postback, we have to update the opportunity to register the conversion event:

{
"created_at": 14805156161,
"opportunity_id": "aoadwomawodi-awdawdd-awda",
"user_agent": "Internet Explorer 8",
"ip": "192.168.1.1",
"device":{
  "os": "Android",
  "model": "Samsung"
   },
"clicked": 1
}

Given than most opportunities don’t convert (most ads aren’t clicked, and most users do not convert), we need a system that after a reasonable amount of time, asumes that the conversion is never going to take place and register the conversion as failed for the opportunity:

{
"created_at": 14805156161,
"opportunity_id": "aoadwomawodi-awdawdd-awda",
"user_agent": "Internet Explorer 8",
"ip": "192.168.1.1",
"device":{
  "os": "Android",
  "model": "Samsung"
   },
"clicked": 0
}

What this means from a technical point of view is that we are dealing with a massive amount of real time streaming data. This data needs to be stored in such a way that can be read relatively easy to train models.

But, at the same time, every single datapoint is going to be updated once.

This leaves us with the following options in terms of data storage.

  • HDFS - The good ol’ reliable Hadoop Distributed File System. It just works. However, it is designed to build append only data lakes. In order to use HDFS, I would write the opportunities on HDFS, then the conversions on another path as a simple list of opportunity_id, and then on train time, just do a massive join.

  • HBASE (on top of HDFS). HBase is another project of the Apache Foundation. In a nutshell, it brings CRUD to HDFS. It has a very interesting columnar structure and it creates indexes on top of HDFS. This allows users to update HDFS records, at the cost of slower read time.

If I took this approach, things would be easier, wouldn’t they? I could just write to HBASE the opportunities and then update the records once I received the postback.

Well, not really, I have found that HBASe has some cons as of 2016:

- The documentation is lacking. And by documentation I mean updated tutorials. I have my fair share of experience installing tools in my machine, and setting up hbase on pseudo distributed mode on top of an existing zookeeper has been a challenge, mostly because of the so many different `hbase-site.xml` configurations that you can find around.

- Hbase does not support nested schemas, at least not officially. Given that I wanted to have flexibility of schemas, this was a big con for me.

- Pyspark HBase support is non existing. Period. I was baffled when I saw [this Commit](https://github.com/apache/spark/commit/a680562a6f87a03a00f71bad1c424267ae75c641) on the Spark project. On that commit, a contributor removes the examples of Pyspark connecting to hbase among others, and includes the commit message:

 *... Also, remove a couple of outdated examples. HBase has had Spark bindings for

a while and is even including them in the HBase distribution in the next version, making the examples obsolete. The same applies to Cassandra, which seems to have a proper Spark binding library already…*

 This is yet another example of Python being second hand citizens in the Spark World. As of 2016, **there is no official way of connecting pyspark to Hbase**. This is ridiculous. Hbase is a mature project (and a top level Apache Project, so is Spark), and adds a so much needed functionality to the distributed computing world.

Which brings me to…

Interlude - How to connect Pyspark 2.0.0 and HBase.

Ain’t easy. This is how I got HBASE read/write support in Pyspark 2.0.0:

  1. Set up Spark, HDFS, Hbase and (maybe) Zookeeper. There are different ways of setting up each one of the services (standalone, distributed and pseudo distributed generally) and each use case will require a different one.
  2. Create table hbase. Easy peasy, just do:

    echo "create 'TABLE_NAME', 'COLUMN_FAMILY'" | hbase shell

  3. Run Spark jobs the proper way.

I found out that the only Hbase/Spark integration that worked for me HortonWorks’ sch. Even though they do not support spark 2.0.0 officially right now, the 2.0 branch works.

To use shc with pyspark you need to add the following arguments to the spark call (whether is spark-submit or pyspark:

--packages com.databricks:spark-avro_2.11:3.0.1,com.hortonworks:shc:1.0.0-2.0-s_2.11, where 2.0 is your spark main version (2.0.0 on my case), 2.11 is your scala version.

--repositories http://repo.hortonworks.com/content/groups/public/, adding the hortonworks repo so spark knows where to find the package.

--files $HBASE_HOME/conf/hbase-site.xml, where HBASE_HOME is the path to your Hbase installation.

So an example spark-submit job would look like:

spark-submit --packages com.databricks:spark-avro_2.11:3.0.1,com.hortonworks:shc:1.0.0-2.0-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/ --files /usr/local/Hbase/conf/hbase-site.xml script.py arg1, arg2

In Pyspark, to write a dataframe to Hbase, you need to first define a catalog. They look like:

catalog = ''.join("""{{
        "table":{{"namespace":"default", "name":"{TABLE_NAME}"}},
        "rowkey":"key",
        "columns":{{
            "key":{{"cf":"rowkey", "col":"key", "type":"string"}},
            "column1":{{"cf":"{COLUMN_FAMILY}", "col":"column1", "type":"string"}},
            "column2":{{"cf":"{COLUMN_FAMILY}", "col":"column2", "type":"string"}}
            ... OTHER COLUMN DEFINITIONS HERE
        }}
        }}"""

With TABLE_NAME and COLUMN_FAMILY matching those defined when you create the table.

Now, once you have set up the Hbase data catalog you can write a dataframe to HBase like:

(df
  .write
  .options(catalog=catalog)
  .format('org.apache.spark.sql.execution.datasources.hbase')
  .save()
)

And read from Hbase like:

df_reloaded = (
          spark.read
          .options(catalog=catalog)
          .format('org.apache.spark.sql.execution.datasources.hbase')
          .load()
)

One caveat as of now, is that shc in pyspark only supports strings, so other data types need to be cast in an out of HBase.

Benchmarking

Given how critical this decision was for the project, I wanted to make sure I took the final decision based on the best benchmarks I could find. Well, I could not find any good benchmarks to use as a proxy for my use case, so I set up my own.

At a high level, these were the benchmark’s tasks:

  1. Load opportunities to the final storage (Hbase or HDFS).
  2. Load the conversions to the final storage (Hbase or HDFS)
  3. Merge the opportunities and the clicks into a single dataframe. This meant doing a join on the HDFS only approach.
  4. Data cleaning, casting tasks.
  5. Multiple counts by different features with different cardinalities.
  6. Multiple filters by different features with different cardinalities
  7. Machine learning Pipeline training.

The benchmarks were done with a large-is dataset (250MM records), on a testing 2 noders cluster.

Results

At the end, the results were pretty clear. Given how unique our use case is (one update per record), Hbase is not the solution. Regardless of the schemas tested (flat vs nested), the performance of a simple HDFS join outperformed HBase both in writes (obvious) and in complex reads (not so obvious).

Given how bad Hbase support in PySpark is right now combined with the superior performance of Hbase made it a no brainer for us. HDFS it is (was).

Thanks for reading! If you have any questions or comments on this article, please do not hesitate to reach out to me.

Powered by Hugo & Kiss.