Editor’s note: This article is part of a series examining issues related to evaluating and implementing big data analytics in business.
One of the key objectives of using a multi-processing node environment is to speed application execution by breaking up large “chunks” of work into much smaller ones that can be farmed out to a pool of available processing nodes. As long as there are no dependencies forcing any one specific task to wait to begin until another specific one ends, these smaller tasks can be executed at the same time – this is the essence of what is called “task parallelism.”
As an example, consider a telecommunications company that would like to market bundled mobile telecommunication services to a particular segment of households in which high school age children are transitioning to college and might be targeted for additional services at their future college locations. Part of the analysis involves looking for certain kinds of patterns among collections of call detail records among household members for households that fit the target model of the marketing campaign. The next step would be to look for other households who are transitioning into the target model and determine their suitability for the program. This is a good example of a big data analysis application that needs to scan millions, if not billions of call detail records to look for and then match against different sets of patterns. The collections of call detail records can be “batched” into smaller sets and analyzed in parallel for intermediate results, which can later be aggregated to provide the desired insight.
And this leads into one misconception of the big data phenomenon: the heightened expectations of easily-achievable scalable high performance resulting from automated task parallelism. One would expect that this example application would run significantly faster over larger volumes of records when it can be deployed in a big data environment, and that is the concept that has inspired such a great interest in the power of big data analytics.
And yet, it is not so simple to achieve these performance speed-ups. In general, one cannot assume that any arbitrarily chosen business application can be migrated to a big data platform, recompiled, and magically scale up in both execution speed and support for massive data volumes. Having determined that the business challenge is suited to a big data solution, the programmers have to envision a method by which the problem can be solved, and they have to design and develop the algorithms for making it happen.
The Framework’s Role
This means that for any target big data platform, you must have an application development framework that supports a system development lifecycle and provides a means for loading and executing the developed application. A good development framework will simplify the process of developing, executing, testing, and debugging new application code, and this framework should include:
- A programming model and development tools.
- A facility for program loading, execution, and for process and thread scheduling.
- System configuration and management tools.
The context for all of these framework components is tightly coupled with the key characteristics of a big data application—algorithms that that take advantage of running lots of tasks in parallel on many computing nodes to analyze lots of data distributed among many storage nodes. Typically, a big data platform will consist of a collection (or a “pool”) of processing nodes; the optimal performances can be achieved when all the processing nodes are kept busy, and that means maintaining a healthy allocation of tasks to idle nodes within the pool.
Any big application that is to be developed must map to this context, and that is where the programming model comes in. The programming model essentially describes two aspects of application execution within a parallel environment: How an application is coded, and how that code maps to the parallel environment.
The MapReduce Programming Model
We can use the Hadoop MapReduce programming model as an example. A MapReduce application is envisioned as a series of basic operations applied in a sequence to small sets of many (millions, billions, or even more) data items. These data items are logically organized in a way that enables the MapReduce execution model to allocate tasks that can be executed in parallel. The data items are indexed using a defined key into <key, value> pairs, in which the key represents some grouping criterion associated with a computed value. In the canonical MapReduce example of counting the number of occurrences of a word across a corpus of many documents, the key is the word and the value is the number of times the word is counted at each process node.
Each processing node can be assigned a set of tasks to different subsets of the data, keeping interim results associated with each key. In the word count example, counting the number of occurrences of each word in a single document can be assigned as one task to a processing node; this will be done for all of the documents, and interim results for each word are created. Once all the interim results are completed, they can be redistributed so that all the interim results associated with a key can be assigned to a specific processing node that accumulates the results into a final result.
Under the right circumstances, the ability to create many smaller tasks that can execute in parallel allows you to take advantage of existing processing capacity and speed the delivery of the results. This speedup can be scaled as the data volumes to be processed increases by adding additional processing nodes and storage nodes.
More discretely, the MapReduce programming model consists of five basic operations:
1) Input data. The data is loaded into the environment and is distributed across the storage nodes, and distinct data artifacts are associated with a key value.
2) Map, in which a specific task is applied to each artifact with the interim result associated with a different key value. An analogy is that each processing node has a bucket for each key, and interim results are put into the bucket for that key.
3) Sort/shuffle, in which the interim results are sorted and redistributed so that all interim results for a specific key value are located at one single processing node. To continue the analogy, this would be the process of delivering all the buckets for a specific key to a single delivery point.
4) Reduce, in which the interim results are accumulated into a final result.
5) Output result, where the final output is sorted.
These steps are presumed to be run in sequence, and applications developed using MapReduce often execute a series of iterations of the sequence, in which the output results from iteration n becomes the input to iteration n+1. This model relies on data distribution – note that in the first step in which data artifacts are distributed across the environment. If the data is already distributed (such as stored in a distributed file system), this is not necessary, and only the assignment of the key is critical. That shows how large data sets can be accommodated via distribution, and also suggests scalability is achievable by adding more storage nodes.
At the same time, the Map and Reduce steps show where the bulk of the execution parallelism comes in. During the Map phase, the same task is applied to many artifacts simultaneously, and since they are independent tasks, each one can be spawned off as an independent execution thread. The same can be said for the Reduce step as well. MapReduce is reminiscent of a functional programming model, and MapReduce applications can be developed using programming languages with library support for the execution model, which we’ll explore in a bit.
The ECL Programming Model
Another example of a programming model is called Enterprise Control Language, or ECL, which is a data-centric programming language for a different open source big data platform called HPCC (High-Performance Computing Cluster) that was developed by a company called Seisint that was later acquired by Lexis-Nexis. The HPCC environment was recently spun off into a company called HPCC Systems, and a community version of the product is available for downloading from their web site.
As opposed to the functional programming model of MapReduce, ECL is a declarative programming language that describes what is supposed to happen to the data, but does not specify how it is done. The declarative approach presumes the programmer has the expectation that the compiler and execution scheme automate the parallelization of the declarative statements, which in many cases simplifies the programming process.
ECL provides a collection of primitive capabilities that are typical for data analysis, such as sorting, aggregation, deduplication, as well as others. With ECL, the declarative model is the source of task parallelism, in which discrete and small units of work can be farmed out to waiting processing units in a cluster and executed in parallel. With ECL, each of the programming constructs can be executed in parallel.
Applying the Framework to Business
Both the MapReduce and ECL programming models are implicitly aligned with both data distribution and task/thread parallelism, which obviates the need for the programmer to explicitly direct the determination of two things: how tasks are configured and how they are assigned to the different processing nodes. That means that we would need the execution model to automatically transform the “logical” parallelism into real parallel tasks that can be executed as individual threads.
To put this in business terms, let’s revisit our telecommunications example, and concentrate on one small piece of the puzzle: characterizing households in terms of their degrees of connectivity by looking at the frequency of contacts in relation to the call detail records.
This nominally involves two tasks. First, we must analyze the frequency of telecommunication contacts (such as telephone calls, text messages and emails) among individuals known to belong to the same household by time and by location; and second, we then need to classify those households in relation to their “degree of connectivity.” The household ID becomes the key. You may want to run this application on a cluster of 100 processing nodes, but clearly there will be many orders of magnitude more household IDs to be used as key values. How do the tens of thousands (or more) microtasks get allocated across this 100-node network?
The answer is that the execution model effectively manages the workloads. Here is a summary of the process:
First, when the datasets are loaded, they will be distributed across the storage configuration. There are different configurations in which storage nodes are accessible to the processing nodes, with different access methods and corresponding access times. Often, the storage nodes will be directly connected to processing nodes.
When the processing node is co-located at the storage node, it simplifies this allocation of tasks. The task manager also arranges for the scheduling of tasks (that is, the order in which they are executed) and may also continuously monitor the status of each task to ensure that progress is being made. This basic approach is adapted in different ways by the developers of the execution model to support the corresponding programming model.
As an example, one task manager process maintains the list of tasks to be performed, and doles out those tasks based on data locality (in which the processor closest to the storage node is selected) to minimize the time associated with data latency, which slows down the computing process.
The last part of making a developed application executable is the system configuration. Each of the programming models must provide a way to notify the execution model the details of the execution environment: things like whether the program is being run on a single node (as might be done during the development and testing phase of development) or on a cluster, the number of nodes in a cluster, the number of storage nodes, the type of data, as well as suggestions for optimizing the shuffle or data exchange phase in which interim results are broadcast to their next processing step.
At the high level, these concepts are straightforward. The devil is in the details, and a future article will delve deeper into the inner workings of MapReduce as a way of understanding the simplicity of the model and how it can be effectively used.
David Loshin is the author of several books, including Practitioner’s Guide to Data Quality Improvement and the second edition of Business Intelligence—The Savvy Manager’s Guide. As president of Knowledge Integrity Inc., he consults with organizations in the areas of data governance, data quality, master data management and business intelligence. Email him at firstname.lastname@example.org.
Home page photo of Spanish weightlifter Lidia Valentin at the 2012 Olympics by Wikipedia user SimonQ. Used under Creative Commons license.