Thinking@Scale Yan Qi     About     Feed

Clean Agile

Teamwork is everywhere, especially important in the human society. For example in the software project, it cannot be emphasized too much as long as more than one person get involved. There can be many aspects affecting the performance of teamwork. The keys are about communication and collaboration.

In my not-so-long career as software engineer, I found one of biggest challenges that prevent developers from delivering a successful software product is due to the communication gap between them and their business partners. Many failures could be avoided if both parties are able to sync-up earlier. However, timing is not the only factor. The communication may lead nowhere if the common language is absent. The business people often use a human language, like English to describe what they need, or the specifications; whereas, developers prefer more formal languages, typically thinking of translating the business specifications into code (e.g., acceptance tests). This difference clearly causes a challenge.

Agile tries to address the challenge faced by a small group of software developers with a feedback-driven approach. Therefore a software project is composed of many small cycles, each of which aims to provide a working or deliverable product that their business partners can review and both sides would discuss and decide what to do next. Instead of particular rules or steps, Agile emphasizes a set of principles and values, and encourages to cultivate a culture out of those. The book by Robert C. Martin, ‘Clean Agile: Back to Basics’ gives a very clear explanation on these values and principles. Furthermore, it provides quite a few guides for applying Agile in practice. As a result of reading and learning, I made a presentation based on the book, hopefully it could highlight the main points.

RecordBuffer - A Data Serialization Approach in DataMine

Data serialization is a basic problem that every data system has to deal with. To provide an efficient solution, a data serialization approach should be able to arrange the data into a compact binary format which is independent of any particular application. Nowadays there are some open-source projects on the data serialization system, such as Avro, Protocol Buffer and Thrift. These projects have reasonable large communities and are rather successful in different application scenarios. They are generally applicable in different applications, more or less following the similar ideas when serializing the data and providing APIs for message exchanges. These approaches are for general purpose and usually working well. Additionally it is also possible for them to work with other data formats such as Parquet to provide variety of options.

However, they could do better when applied to the data with nested structure. For example, the in-memory record representation may consume the similar memory even though only a few of columns have meaningful values. On the other hand, it is possible to improve the deserialization performance with the help of index.

DataMine, the data warehouse of Turn, exploits a flexible, efficient and automated mechanism to manage the data storage and access. It describes the data structure in DataMine IDL, follows a code generation approach to define the APIs for data access and schema reading. A data encoding scheme, RecordBuffer is applied to the data serialization/de-serialization. RecordBuffer depicts the content of a table record as a byte array. Particularly RecordBuffer has the following structure.

  • Version No. specifies what version of schema this record uses; it is required and takes 2 bytes.
  • The number of attributes in the table schema is required and takes 2 bytes.
  • Reference section length is the number of bytes used for the reference section; it is required and takes 2 bytes.
  • Sort-key reference stores the offset of the sort key column if exists; it is optional and takes 4 bytes if exists.
  • The number of collection-type attributes uses 1 byte for the number of collections in the table, and it is required.
  • Collection-type field references store the offsets of the collections in the table sequentially; note that the offset of an empty collection is -1.
  • The number of non-collection-type field reference uses 1 byte for the number of non-collection-type columns which have hasRef annotation.
  • Non-collection-type field references sequentially store the ID and offset pair of columns with hasRef annotation if exist.
  • Bit mask of attributes is a series of bytes to indicate the availability of any attributes in the table.
  • Attribute values store the values of available attributes in sequence; note that the sequence should be the same as that defined in the schema.

Different from other encoding schemes, RecordBuffer has a reference section which allows index or any record-specific information. Having index in the reference section can locate the field (like sort-key) directly, simplifying data de-serialization significantly. On the other hand, the frequently-accessed derived values can be stored in the reference section to speed up data analytics. This is quite useful when nested data are allowed. For example, a summary on the nested attribute values can be derived and stored in the reference section, such that the deserialization of the nested table (usually very costly) can be avoid when applying aggregation to the attribute.

Deploy Application As Data (DaaD)

The distributed computing stack commonly uses a layered structure. A functionally independent component is defined on each layer, and different layers are connected through APIs. This structure makes it quite easy for system to scale. One example of such a system can be composed of local OS/FS, distributed FS, resource management system, distributed computing frameworks, and applications. Nowadays, the HDFS is often used as the distributed file system. Yarn is one example of the resource management systems, whereas Spark can be one of promising computing frameworks.

The key of distributed computing is to run the same code on the different parts of data then aggregate the results into the final solution. Particularly, the data are first partitioned, replicated and distributed across the cluster nodes. When an application is submitted, the resource management system decides how much resource is allocated and where the code can be run (usually on the nodes where the input data are stored, so called data locality). The computing framework devises a job or work plan for the application, which may be made up of tasks. More often than not a driver is issued in the client side (e.g., lines in green) or by a worker (e.g., lines in orange). The driver initializes the job and coordinates the job and task execution. Each task is executed by a worker running on the node and the result can be shuffled, sorted and copied to another node where the further execution would be done.

There are two popular ways to deploy the application code for execution.

  1. Deploy the code in the cluster nodes - This approach distributes the application to every node of the cluster and the client server. In other words, all involved nodes in the system have a copy of the application code. It is not common, but in some cases it is necessary when the application depends on the code running in the node. The disadvantages of this approach are obvious. First the application and the computing system have a strong coupling, such that any change from either side could potentially cause issues to the other. Second, the code deployment becomes very tedious and error prone. Think about the case where some nodes in the distributed environment fail during the code deployment. The state of cluster becomes unpredictable when those failed nodes come alive with the old version of code.
  2. Deploy the code in the client only - A more common strategy is to deploy the application code to client server only. When running the application, the code is first distributed to the cluster nodes with some caching mechanism, such as the distributed cache in the Hadoop. This simple but effective approach could decouple the application and its underneath computing framework very well. However when the number of clients is large, the deployment can become nontrivial. Also if the size of application is very large, the job may have a long initialization process as the code needs distributing across the cluster.

DaaD: Deploy Application As Data

In the distributed computing, the code and the data are traditionally treated differently. The data can be uploaded to the cloud and then copied and distributed by means of the file system utilities. However the code deployment is usually more complex. For example the network topology of application nodes must be well defined beforehand. A sophisticated sync-up process is often required to ensure the consistency and efficiency, especially when the number of application nodes is large.

Therefore if the code can be deployed as data (i.e., DaaD), the code deployment can be much simpler. The DaaD is a two-phase process.

  1. The application code is uploaded to the distributed file system just as common data files.
  2. When running the application, a launcher is used to load the code from the distributed file system, store the code in the distributed cache accessible to all nodes and issue the execution on node.

Clearly, the launcher is required to deployed to the client where the execution request is submitted. It should be independent of any specific applications. An example can be found at DaaD1.

The improvement by the DaaD can be significant.

  • The deployment becomes much simpler. Often the code can be uploaded to the distributed file system through a simple command. Then the code replicating and distributing can be achieved automatically through the file system utilities.

  • The launcher can be defined as a simple class or executable file, which is quite stable. It is trivial to distribute it to the application node.

  • The application code is loaded for execution only if an execution request is issued. Namely the code is actually copied and distributed in the lazy way. Importantly the latest version of code is always used.

  • Having no local copy can avoid of code inconsistency problem.

  • It makes it much easier for different code versions coexist in production. Image the scenario where it is necessary to run the same application with different code versions for A-B test.

Highly Efficient User Profile Management in Petabyte-Scale Hadoop-based Data Warehouse

Traditionally the user profile data are stored and managed in the data warehouse. The user profile can be frequently updated to reflect the changes of user attribute and behavior. Moreover, it is critical to support a fast query processing for profile analytics. These basic requirements become challenging in the big data system, as the scale of profiles have reached a point (over petabytes) that the traditional data warehousing technology can barely handle.

The challenges mainly come from two aspects: the updating process is costly as the daily change can be huge and on the other hand the query performance must keep being improved as the input is rapidly increasing. For example the daily changes in the big data system can be more than terabytes, which makes data updating very expensive and often slow (big latency).

The traditional data warehouse provides a standard solution to the problem of user profile management and analytics, however the data scale that it normally deals with is not satisfactory. The data size in the traditional data warehouse is usually up to tens of terabytes. In the big data system, the petabyte-scale data store is quite normal, such that it is difficult for the traditional data warehouse to store and manage so many user profiles in an efficient way.

At Turn, we come up with an integrated solution to the highly efficient user profile management. Basically we build a data warehouse based on the Hadoop file systems. The size of data stored in the data warehouse can be on a petabyte scale. The data can be stored in row-based or columnar layouts in the data warehouse, such that it can provide both an efficient updating performance and a fast profile analytics.


The architecture of the profile management system is mainly composed of the following components:

  1. Operational Data Store (ODS)
  2. Analytics Data Store (ADS)
  3. Parallel ETL
  4. Cluster Monitor
  5. Database
  6. Query Dispatcher
  7. Analytics Engine

Every day there is a huge amount of data generated from the front-end system, like ad servers. The Parallel ETL (PETL) is a process running in the cluster to collect and process these data in parallel and store them in the Operational Data Store (ODS). The status of the PETL, like what data have been processed, is collected by the cluster monitor and kept in the Database.

The ODS is a row-based storage, as it must support a fast ingestion of the incoming updates. The key-value store can be faster in the data updating, however it is much slower for data analytics. In the case of profile management, the update can be merged into the profile store (i.e. ODS) regularly in batch mode. The cluster monitor collects the status of the ODS, like what data have been stored, what queries have been executed etc, and stores them in the Database.

The Analytical Data Store (ADS) provides a better solution to the data analytics. In the ADS, data are stored in columns. Comparing with its row-based counterpart (i.e., the ODS), the columnar store has a better compression ratio when storing the data, therefore has smaller data size. More importantly, only the data of interested are loaded and read when executing the user query on the columnar store. The disk I/O cost saving is almost optimal, therefore in the I/O intensive application it can achieve faster execution time in orders of magnitude comparing with the ODS. The cluster monitor collects the status of the ADS, like what data have been stored, what queries have been executed etc, and stores them in the Database.

As the data have different layouts in the ODS and the ADS, there is a data conversion from the ODS to the ADS. The conversion result is merged with the data in the ADS. Note that the ADS may not have the latest update in the ODS because the data conversion is done in batch mode. The cluster monitor collects the status of the conversion job and stores it in the Database.

When a user query is submitted, it is first stored in the query table of the Database. The query dispatcher keeps scanning the Database to (1) decide which query to execute next based on the factors, such as the waiting time, the query priority, etc, (2) decide which data store (i.e., ODS or ADS) to use for the query execution based on the data availability and the cluster resource availability, and (3) send the query job to the analytics engine for query execution.

Both the ODS and the ADS have an analytics engine to execute the query job from the query dispatcher.

The Database stores all the status information, including (1) the submitted query job, (2) the status of cluster, (3) the status of data stores (the ADS and the ODS), (4) the status of jobs running in the cluster, including the PETL, the converter, etc.

Disaster Recovery across Data Centers

In the era of BigData, the data storage becomes so large that recovery from a disaster, such as the power outage of a data center, becomes very difficult. The traditional transaction-oriented data management system relies on a write-ahead commit log to record the system state and the recovery process will work only if the log processing is faster than the incoming change requests. In other words, commit log based approach hardly works for big data system where terabytes of non-transactional daily changes are norms.

In Turn, we exploit a geographically apart master-slave architecture to support high availability (HA) and disaster recovery (DR) in the large scale Hadoop-based DWS (Data Warehouse System).

The master and the slave are located in the different data centers that are geographically apart. Functionally, the slave is like a mirror of the master. The master or slave is composed of a Hadoop cluster, a relational database, an analytics engine, a cluster monitor, a query dispatcher, a parallel ETL component, and a console. Not all of these components in the slave are active. For instance, the cluster monitor in the slave is standing by, whereas its analytics engine should be active to accept query jobs.

Data replication happens from the master to the slave to assure the data consistency between the Hadoop clusters, and database replication reflects any change on the master database to the slave database. The master and slave are connected with a dedicated high-speed WAN (Wide Area Network).

The master-slave architecture makes DR and HA possible when one of the data centers fails. Additionally the workload balancing between master and slave optimizes the query throughput1.

Failover and Disaster Recovery

Failure is common in the large storage system. It could be due to hardware failure, software bug, human error etc. The master-slave architecture makes the HA and DR possible and simple in the petabyte-scale data warehouse system at Turn.

Network Failure

When the WAN fails completely, a performance degrading can occur as all queries will be dispatched to the master only. The data and database replications also stop. Fortunately nothing special has to be done with the Hadoop cluster when the WAN comes back to normal, because the data replication process will catch up with the missing data of the slave. A sync-up operation is triggered to synchronize the slave database with the master database.

Hadoop Cluster Failure

In case of the Hadoop cluster failure in the slave, the DWS keeps working as the master has everything untouched. It is a little complex when the master loses its Hadoop cluster, as it implicates a master-slave swapping. Particularly, all components of the slave become active and take over the interrupted processes. For instance, parallel ETL becomes active first, starting to ingest data. Importantly during the swapping, the DWS keeps accepting and running the user query submission.

When the failed Hadoop cluster comes back, the data replication process can help to identify and copy the difference from the current master to the slave. Depending upon the downtime and data loss caused by the failure, it can take up to days to complete the entire recovery. However, certain queries can be executed in the recovering cluster. Furthermore based on the query history in the relational database, the hot-spots of data can be recovered first.

Data Center Failure

It is rare but fatal if one of the data centers fails. At Turn the DWS tolerates a failure of either the master or the slave. When the slave data center is down, the performance is degraded because only one of the Hadoop clusters is available and all workload are moved to the working one. The data recovery is trivial because the replication processes will catch up the difference once the failure is fixed.

If the master data center fails, the stand-by services in the slave will become active right away. There is a chance that data or query may lose if the failure happens in the middle of the data replication. However, the loss can be mitigated if the replication is scheduled to run more frequently. After all services are active, the slave becomes the master. When the failed data center is recovered, it will run as a slave. The data replication is issued to transfer the difference over. Before the database replication starts, a database sync-up process is required to make the new slave have the same content in its relational database.

  1. Any query job can be executed in either Hadoop cluster, as long as its input is available. Therefore, it is possible to balance the workload between clusters. Specifically given a query submission, query dispatcher first checks the input availability on both clusters. If the input is available in both, the query dispatcher assigns the query to the cluster which is less busy. In most case, the query result is small and the cost of reading it back from the slave is negligible.