Thinking@Scale Yan Qi     About     Feed

Highly Efficient User Profile Management in Petabyte-Scale Hadoop-based Data Warehouse

Traditionally the user profile data are stored and managed in the data warehouse. The user profile can be frequently updated to reflect the changes of user attribute and behavior. Moreover, it is critical to support a fast query processing for profile analytics. These basic requirements become challenging in the big data system, as the scale of profiles have reached a point (over petabytes) that the traditional data warehousing technology can barely handle.

The challenges mainly come from two aspects: the updating process is costly as the daily change can be huge and on the other hand the query performance must keep being improved as the input is rapidly increasing. For example the daily changes in the big data system can be more than terabytes, which makes data updating very expensive and often slow (big latency).

The traditional data warehouse provides a standard solution to the problem of user profile management and analytics, however the data scale that it normally deals with is not satisfactory. The data size in the traditional data warehouse is usually up to tens of terabytes. In the big data system, the petabyte-scale data store is quite normal, such that it is difficult for the traditional data warehouse to store and manage so many user profiles in an efficient way.

At Turn, we come up with an integrated solution to the highly efficient user profile management. Basically we build a data warehouse based on the Hadoop file systems. The size of data stored in the data warehouse can be on a petabyte scale. The data can be stored in row-based or columnar layouts in the data warehouse, such that it can provide both an efficient updating performance and a fast profile analytics.


The architecture of the profile management system is mainly composed of the following components:

  1. Operational Data Store (ODS)
  2. Analytics Data Store (ADS)
  3. Parallel ETL
  4. Cluster Monitor
  5. Database
  6. Query Dispatcher
  7. Analytics Engine

Every day there is a huge amount of data generated from the front-end system, like ad servers. The Parallel ETL (PETL) is a process running in the cluster to collect and process these data in parallel and store them in the Operational Data Store (ODS). The status of the PETL, like what data have been processed, is collected by the cluster monitor and kept in the Database.

The ODS is a row-based storage, as it must support a fast ingestion of the incoming updates. The key-value store can be faster in the data updating, however it is much slower for data analytics. In the case of profile management, the update can be merged into the profile store (i.e. ODS) regularly in batch mode. The cluster monitor collects the status of the ODS, like what data have been stored, what queries have been executed etc, and stores them in the Database.

The Analytical Data Store (ADS) provides a better solution to the data analytics. In the ADS, data are stored in columns. Comparing with its row-based counterpart (i.e., the ODS), the columnar store has a better compression ratio when storing the data, therefore has smaller data size. More importantly, only the data of interested are loaded and read when executing the user query on the columnar store. The disk I/O cost saving is almost optimal, therefore in the I/O intensive application it can achieve faster execution time in orders of magnitude comparing with the ODS. The cluster monitor collects the status of the ADS, like what data have been stored, what queries have been executed etc, and stores them in the Database.

As the data have different layouts in the ODS and the ADS, there is a data conversion from the ODS to the ADS. The conversion result is merged with the data in the ADS. Note that the ADS may not have the latest update in the ODS because the data conversion is done in batch mode. The cluster monitor collects the status of the conversion job and stores it in the Database.

When a user query is submitted, it is first stored in the query table of the Database. The query dispatcher keeps scanning the Database to (1) decide which query to execute next based on the factors, such as the waiting time, the query priority, etc, (2) decide which data store (i.e., ODS or ADS) to use for the query execution based on the data availability and the cluster resource availability, and (3) send the query job to the analytics engine for query execution.

Both the ODS and the ADS have an analytics engine to execute the query job from the query dispatcher.

The Database stores all the status information, including (1) the submitted query job, (2) the status of cluster, (3) the status of data stores (the ADS and the ODS), (4) the status of jobs running in the cluster, including the PETL, the converter, etc.

Disaster Recovery across Data Centers

In the era of BigData, the data storage becomes so large that recovery from a disaster, such as the power outage of a data center, becomes very difficult. The traditional transaction-oriented data management system relies on a write-ahead commit log to record the system state and the recovery process will work only if the log processing is faster than the incoming change requests. In other words, commit log based approach hardly works for big data system where terabytes of non-transactional daily changes are norms.

In Turn, we exploit a geographically apart master-slave architecture to support high availability (HA) and disaster recovery (DR) in the large scale Hadoop-based DWS (Data Warehouse System).

The master and the slave are located in the different data centers that are geographically apart. Functionally, the slave is like a mirror of the master. The master or slave is composed of a Hadoop cluster, a relational database, an analytics engine, a cluster monitor, a query dispatcher, a parallel ETL component, and a console. Not all of these components in the slave are active. For instance, the cluster monitor in the slave is standing by, whereas its analytics engine should be active to accept query jobs.

Data replication happens from the master to the slave to assure the data consistency between the Hadoop clusters, and database replication reflects any change on the master database to the slave database. The master and slave are connected with a dedicated high-speed WAN (Wide Area Network).

The master-slave architecture makes DR and HA possible when one of the data centers fails. Additionally the workload balancing between master and slave optimizes the query throughput1.

Failover and Disaster Recovery

Failure is common in the large storage system. It could be due to hardware failure, software bug, human error etc. The master-slave architecture makes the HA and DR possible and simple in the petabyte-scale data warehouse system at Turn.

Network Failure

When the WAN fails completely, a performance degrading can occur as all queries will be dispatched to the master only. The data and database replications also stop. Fortunately nothing special has to be done with the Hadoop cluster when the WAN comes back to normal, because the data replication process will catch up with the missing data of the slave. A sync-up operation is triggered to synchronize the slave database with the master database.

Hadoop Cluster Failure

In case of the Hadoop cluster failure in the slave, the DWS keeps working as the master has everything untouched. It is a little complex when the master loses its Hadoop cluster, as it implicates a master-slave swapping. Particularly, all components of the slave become active and take over the interrupted processes. For instance, parallel ETL becomes active first, starting to ingest data. Importantly during the swapping, the DWS keeps accepting and running the user query submission.

When the failed Hadoop cluster comes back, the data replication process can help to identify and copy the difference from the current master to the slave. Depending upon the downtime and data loss caused by the failure, it can take up to days to complete the entire recovery. However, certain queries can be executed in the recovering cluster. Furthermore based on the query history in the relational database, the hot-spots of data can be recovered first.

Data Center Failure

It is rare but fatal if one of the data centers fails. At Turn the DWS tolerates a failure of either the master or the slave. When the slave data center is down, the performance is degraded because only one of the Hadoop clusters is available and all workload are moved to the working one. The data recovery is trivial because the replication processes will catch up the difference once the failure is fixed.

If the master data center fails, the stand-by services in the slave will become active right away. There is a chance that data or query may lose if the failure happens in the middle of the data replication. However, the loss can be mitigated if the replication is scheduled to run more frequently. After all services are active, the slave becomes the master. When the failed data center is recovered, it will run as a slave. The data replication is issued to transfer the difference over. Before the database replication starts, a database sync-up process is required to make the new slave have the same content in its relational database.

  1. Any query job can be executed in either Hadoop cluster, as long as its input is available. Therefore, it is possible to balance the workload between clusters. Specifically given a query submission, query dispatcher first checks the input availability on both clusters. If the input is available in both, the query dispatcher assigns the query to the cluster which is less busy. In most case, the query result is small and the cost of reading it back from the slave is negligible. 

Efficient Distributed Copy across Data Centers

DistCp is a tool used for large inter/intra-cluster data transfer. It uses Map-Reduce to effect its distribution, error handling and recovery, and reporting. 1

Currently DistCp is mainly used for transferring data across different HDFSs (HaDoop File Systems). The HDFSs can sit in the same data center where the data flow through LAN (Local Area Network) or in the different data centers connected by WAN (Wide Area Network). Basically, DistCp issues Remote Procedure Calls (RPCs) to the name nodes of both source and destination, to fetch and compare file statuses, to make a list of files to copy. The PRC is often very expensive if the name nodes are located in the different data centers, for example, it could be up to 200x slower than the case within the same date center in our experience. DistCp may issue the same RPCs more than once, dragging down the entire performance even further. DistCp doesn’t support regular expression as input. If the user wants to filter and copy files from different folders, she has to either calculate a list of file paths beforehand or execute multiple DistCp jobs. Moreover DistCp allows preserving the file attributes during the data transfer. However, it does not preserve time stamp of file, which is quite important in some applications.

To address these problems with DistCp, we introduced an enhanced version of distributed copy tool, DistCp+2. Particularly DistCp+ makes it easier and faster to transfer a large amount of data across data centers. Comparing with DistCp, DistCp+ introduces improvement in the following aspects:

Support Regular Expression

A regular expression is a sequence of characters that forms a search pattern. It has been wildly used in the text processing utilities, for example the command grep in Unix. The regular expression used by DistCp+ is based on the syntax of regular expression in Java 3 with minor changes. To use the regular expression option with DistCp+ you must specify 2 parameters: a root URI and a path filter. The path filter follows normal regular expression rules but treats all ‘/’ tokens in a special way. The ‘/’ token is used as a delimiter and the regular expression provided is split into multiple sub-expressions around this token. Each sub-expression is used as a separate path filter for a specific depth relative to the URI with the leftmost sub-expression being used first.

For example, assuming you specify “/logs/” as the root URI and provide a regular expression of “server1|server2/today|yesterday”. The regular expression will be split into the 2 sub-expressions “server1|server2” and “today|yesterday”. Then DistCp+ will traverse the file system starting at the root (“/logs/”) and use any file that matches the first sub-expression (“server1|server2”). Folders are recursively expanded, but at each new depth in the file system the next sub-expression is used as the path filter. Using this example provided, you can match such files as “/logs/server1/yesterday” and “/logs/server2/today”, but it will not match something like “/logs/yesterday”. Also note that if a folder matches the last path filter, the entire folder is used as input instead of being recursively traversed.

Cache File Status

When a DistCp job copies a large number of files especially across geographically distant data centers, it usually has a very long setup time as several RPCs are issued to collect the file status from both sides. However, the cost of RPC is very high especially when it is through WAN. DistCp repetitively issues RPCs to get individual directory or file’s status object. These RPCs either overlap with previous ones or can be combined into fewer RPCs. To reduce the cost, a cache of file status is created in the early stage, when directory level RPC is used to get all file status under the very directory in one RPC. Then RPC is necessary only if a cache miss occurs in the following stages. For tens of thousands file transferring, we observe a significant improvement on the end-to-end time cost.

Keep Time Stamp

DistCp supports to preserve the file status, including block size, replication factor, user, group and permission. However, it does not keep the time stamp (or last modified time stamp) of the file, which is important especially when we use “-update” option to skip files without any change. Checking CRC (Cyclic Redundancy Check) of the file is an alternative, but the CRC computation of files is too high to be practical for large data transfer. Comparing the file size may not be accurate as some changes do not change the size of files. Therefore the time stamp is better to decide if an update is necessary. Particularly, the file has its time stamp preserved after copied if necessary. When “-update” option is specified, DistCp+ compares the time stamps of files on different clusters to decide if the file is included.

In Turn, DistCp+ has been used to transfer data among different data centers regularly. A DistCp+ job can usually copy thousands of files from different folders and the data volume can be terabytes.

Nested Data in DataMine

After joining Turn, I started to work on DataMine, a peta-byte scale data warehouse built upon Hadoop. One of the most important features of DataMine is that it can effectively support the nested data structure.

Comparing with the traditional relational data model, the nested relational data model allows the value in a table to be a set or a hierarchical structure. While stored in the database it cannot be simply normalized, instead it is depicted in the non-first normal form (i.e., non-1NF). In other words, the constraint that all domains must be atomic is not satisfied. Clearly it is a drawback if the data needs updating frequently. Whereas the nested relational data model makes the data representation more natural and efficient, and importantly it can eliminate join operations while reading. From this point of view the nested data structure can work well with data warehouse, where OLAP (OnLine Analytical Processing) is more common than OLTP (OnLine Transaction Processing).

DataMine exploits a nested relational data model. Particularly it allows the domain of one attribute of a table to be another table. One typical use case with DataMine is to store the on-line user profiles in a table with nested tables. Each record is composed of many user attributes, such as ID, time stamp, campaign information etc. Some attributes like campaign information can be further nested tables. An example can be found below.

To enable an efficient data access or query processing, DataMine implements an unnesting operation that flattens a record into a set of records. Thus the existing relational query execution techniques can be applied. Actually the unnesting operation tends to transform the table with nested data structure from non-1NF to 1NF. For example, the table above can be unnested as the following result.

When the table sizes become very large, it is not efficient to support JOIN between tables. This can be one reason why fewer tables are strictly normalized with their sizes increasing. Keeping everything within a single table can eliminate some JOINs. On the other hand, the correlation analytics at the record level is possible. DataMine allows JOIN between nested tables within a query through implementing special LIST functions.

A table in DataMine can have billions of records, and the nested table of a record can have millions of nested table records. Scalability is always the first consideration in the design and development. Now, DataMine stores its data in the HDFS (HaDoop File System). Depending upon the requirements, the data can be in row based or column based. Columnar store is a good fit for the use cases where partial deserialization is common, whereas row-based store can keep a performance balance between reading and writing.

From my experience, many applications in the big data era share some common features:

  • Data normalization is not necessary. Nested data model is a natural choice when a hierarchy gets involved in the data structure.
  • The data are written once and read multiple times. In other words, the data updating is not often a requirement.
  • The complex data analytics can be implemented efficiently through applying JOIN operations among nested tables inside a record.

Certainly DataMine is a good fit in these applications.

Spirits in Action Never Die

Recently an article in drew my attention. It talks about PHP, a programming language created in 1994. It reminds me of those days when I studied PHP, making me think a lot about the programming languages and the ideas underneath. (Interestingly both PHP and Hadoop employ an elephant in the logo)

The first programming language that I learned is Pascal. Since then, there have been a long list of programming languages that I have ever used. Few of them were taught in my college such as Pascal, C and ASM. Others were mostly self taught, when they were regarded as necessary. For instance, when I did research on the high availability systems, I worked on some programs in Erlang, a programming language not commonly recognized.

The story of PHP might sound a little funny. PHP, as you may know, is a scripting language that allows programmers to build dynamic Web pages. Clearly, it doesn’t seem right if you apply it in the field of, let’s say, machine learning. When I was a senior at college, one of the post-doctors in the lab where I volunteered assigned me a job: to implement a machine learning algorithm in PHP. According to her, the program would be easily deployed as a Web service if it were implemented in PHP. I was too fresh to think about it again before taking action. In the following weeks, I tried my best to learn and use PHP to implement the algorithm. However, no matter how hard I tried, the program I got did not work: it was too slow to get completed. In the end I had to re-implement the algorithm in C, which proved to be the right choice.

The bright side of this story is that the effort I took on PHP was not a waste. I learned how to build a website with PHP. I am also impressed by its simplicity and flexibility. PHP may not be perfect, but it is simple and powerful enough in most cases. Furthermore, learning how PHP works also deepened my understanding of web programming. Other lessons that I learned about programming also include:

  • Every programming language has its pros and cons. There is no such a thing as ‘silver bullet’.
  • It is necessary to have a thorough understanding of problem in the field in order for choosing the right programming language.
  • There is a huge gap between knowing a language and using it well. No shortcut! But keeping practicing can always lead us closer to be a master.

The programming language is quite different from the human language. Rather than often a communication vehicle, the programming language is more like a tool for solving computational problems. Obviously as there can be more than one solution or different ways to attack the problem, no programming language would be the ONE. In some scenario, one may be better than the other, but in others not necessarily. Sometimes it is amusing to see people arguing against each other as to which programming language is the best. In many cases, people simply ignore the problems the languages try to solve, instead they would rather pay more attention to the features or functionalities. Remember I was one of them once upon a time, when I started to learn Java. I thought Java would replace C++ or C some day as it is a write-once-run-anywhere language. However, the day never comes because Java and C++ are excellent only in some particular fields.

Instead of focusing on the programming languages, I believe it is more helpful to think of the problem. Why are there more than one programming languages to attack the same problem? Is it a tricky problem? What is the challenge behind? Is there anything we can do to improve any exiting tool? Not only does thinking in this way pull us out of the pointless argument, but leads us to better ourselves.