Lately, the analytics team at Ning has been re-thinking the way we collect data and store it into HDFS.
The system we had in place was a combination of a few subsystems, including shell scripts (rsync run from cron writing directly to HDFS), and a custom built Hadoop endpoint, named ‘collector’. The collector works as follow: the clients post serialized Thrift events to a set of servers which in turn write them into HDFS. These servers provide additional services including data validation as well as bucketizing to limit the number of files the Hadoop Namenode needs to track.
We decided to streamline this log aggregation process and to improve the robustness of the architecture by making all subsystems use the collectors exclusively, giving us a single front door to HDFS. This forced us to re-think the way we scaled this endpoint. Historically, a load balancer (VIP) front-ended a set of collectors which could be scaled horizontally to accommodate any increase in load. There were a couple of disadvantages to this approach. As the number of clients increased, we had to provision more collectors to handle the increased number of connections even though our data transfer rate was low. The other challenge was distributing this model across multiple data centers.
To solve these issues, we needed a way to abstract the event transfer pipes between the clients and the collectors. Scribe, Facebook’s log aggregator, suited our needs very well. We added a tree of Scribe servers between the clients and the collector VIP. These intermediate Scribe servers took care of aggregating and forwarding requests from the large set of clients. The collectors were thus able to process a large number of incoming events with a fixed set of connections from the Scribe servers. The overall deployment architecture is shown in the figure below.
Even though Scribe is capable of writing to HDFS directly, we added a Scribe endpoint to our collectors which allows us to filter and transform the event data before storing it in HDFS. The primary advantage of having this intermediate layer is that it allows us to do processing of the live event stream. For example, we broadcast certain events through JMS Topics that other parts of our system subscribe to.
However, we encountered a few issues with this deployment under heavy load. By default, the Scribe server connects to a single address that it needs to forward the messages to. This made load balancing difficult when used in conjunction with a load balancing VIP. Once a connection has been set up, there is no way to re-balance the load as additional collectors are added. This resulted in some of our collectors being heavily loaded while others sat idle.
To address this problem, we added additional functionality in Scribe that forces it to reconnect after having sent a fixed number of messages. After sending a certain number of messages (this is a configurable value), the Scribe client closes and re-opens the connection. The load is balanced across the collectors as we open new connections periodically. The graphs below show the load distributed between six collectors on a staging environment (the connection is recycled after one million events are sent).
The graph below shows the load in our production environment. The service is currently handling 2,800 messages per second (20 Mbits of data per second). The high frequency sinusoidal components illustrate the load balancing feature. The overall waveform reflects the evolution of traffic throughput between days and nights (the peak being around 10:00AM PDT).
This modification has been open-sourced. See our repository on GitHub. By the way, we are currently working on open-sourcing the collectors. Stay tuned for the announcement on our Ning code blog.