The Analytics team at Ning has recently been working on a low latency feed from our platform. With more and more features being offered to our Network Creators, we need to be able to get quick feedback from our products and systems: if a deployment introduces an overall page rendering slowdown across Ning networks for instance, we want to know about it in a matter of minutes, not hours.
Our platform is instrumented using our open-source Thrift collectors which aggregate and dump data into HDFS. A periodic process imports the data from Hadoop to our Data Warehouse (Netezza), and we’ve built a variety of dashboards to visualize trends over time.
Historically, this pipeline had about an hour of delay, meaning that our dashboards were usually an hour stale. The reason is twofold. First of all, collectors are configured to buffer events locally, in order to create larger files in HDFS (we don’t use file append): larger files means less bookkeeping needed by the namenode. Second, the event loader from HDFS to Netezza is scheduled to run on an hourly basis.
To understand this second point, let’s recall how the collector writes events to HDFS. When events are written to Hadoop, the final path name is determined by the timestamp when the event was fired: each of our Thrift event has an eventDate property, either set by the client sending the message, or automatically generated by the receiving collector. For instance, if an event was fired at 11:52:12am on 01/12/11, it will end up in /events/<MyEvent>/2011/01/12/11/<some unique name>.thrift.
Back to the Netezza loader, we implemented it as follow: it gets a list of files from HDFS, reads it (and decode it from Thrift), and pipes it into nzload – Netezza’s standard import tool. These decoded events end up in a stage table during the import process, and are atomically loaded into our production tables when all files have been read.
The tricky part here is to avoid loading duplicates. Two events can be exactly the same but not necessarily be duplicates. How do you detect it? One way would be to do some bookkeeping on the file names that have been processed. Unfortunately, this is tricky as external processes may be modifying these files: for instance, we have a combiner script, that runs on a daily basis and look for small files to combine into larger ones.
One easy way to go around this problem is to load only complete hours (e.g. /events/<MyEvent>/2011/01/12/11), as you are guaranteed (assuming you take into account any other delay in the platform) that no new files are added to this directory after 12:00 or so (we give a bit of room for miscellaneous delays and retries).
In such a world, how can we reuse this pipeline for a more real-time feed? One option is to use the ActiveMQ hook that the collectors have. Unfortunately, our internal customers want to use Netezza, as our BI infrastructure is already hooked into it.
We came up with solution by optimizing the current pipeline piece by piece.
First, we created a new pool of real-time collectors which buffer events locally for a very short period of time. We configured the threshold to be one minute, meaning that the events will sit locally on the disk for at most 60 seconds. On the client side, we use the open-source eventtracker library (which shares the same persistent queue library under the hood) and configured it to send events asynchronously every 60 seconds as well. With such a configuration, events end up in HDFS at most two minutes after they have been fired. In practice, we’ve seen a shorter delay on average.
The image below is a screenshot from the action-core, our open-source HDFS browser. It displays one event, with 21 fields, the first one being the date when the event was fired. The file name indicates when the file was written to HDFS: 01:48:21. The total delay is about 27 seconds.
Second, we had to optimize the Netezza loader. We kept the same approach by working on hours, but instead of waiting for an hour to complete, we load the current hour every three minutes in a stage table. A transaction will then delete every event in the current hour in the production table and will then load the stage. This loading step takes about two minutes.
This works (and is fast) for relatively low volume events feeds. It wouldn’t work for other feeds we have that generate a few hundred million events a day, as deletes in Netezza are slow.
A potential optimization would be to use Netezza materialized views, which combine old and fresh data in an optimized way. We are currently investigating it, in order to generalize this approach to larger feeds.
The screenshot below shows a query to Netezza. The eventDate field in the event maps to an event_date column.
Finally, we use Tableau as our visualization tool. Tableau can issue live queries directly to Netezza to fetch the latest records. The query is blazingly fast, as we leverage Netezza zonemaps (the dashboard issues queries bounded by timestamps) to stream only a few blocks out of the disk.
The result of these optimizations: a feed of events which is about 3-7 minutes stale. That’s of course only the beginning, and we are actively working on further improving this process, as Ning is growing fast, and the Analytics team needs to accommodate more and more requests from our internal and external customers.