Dispelling fears, eliminating illiteracy and destroying myths about the iron-born elephant.
Suppliers: Apache, Cloudera, Hortonworks, MapR
What is Hadoop?
Hadoop is the project of the Apache Software Foundation upper level of organization, that is why the main distributive and the central repository for all the developments belong to Apache Hadoop. But this very distribution is also the main reason for the burnt-out nerve cells when you first meet this tool: by default the installation of the elephant onto the cluster requires preliminary setup of the machines, manual installation of packages, editing a lot of configuration files and a lot of other movements. At the same time documentation is often incomplete or just out-of-date. That is why more often Hadoop distributions of one of the following three companies are used:
Cloudera. The key product – CDH (Cloudera Distribution including Apache Hadoop) – is a package of the most popular tools from Hadoop infrastructure controlled by Cloudera Manager. The manager is responsible for the deployment of the cluster, installation of all components and their further monitoring. Besides CDH the company develops other products, for example, Impala (see below). The distinctive feature of Cloudera is the tendency to be the first to introduce new products at the market, even at the expense of stability. And also the creator of Hadoop – Doug Cutting – works in Cloudera.
Hortonworks. Just like Cloudera, they give a single solution in the form of HDP (Hortonworks Data Platform). Their distinctive feature lies in the fact that instead of developing their own products they invest more into the development of Apache products. For example, instead of Cloudera Manager they use Apache Ambari, instead of Impala they develop Apache Hive. My personal experience with this distribution narrows down to a couple of tests on a virtual machine, but I feel that HDP looks more stable than CDH.
MapR. Apart from two previous companies, the main income source of which obviously lies in consulting and partnership programs, MapR sells their own developments. Pluses: a lot of optimization, partnership program with Amazon. Minuses: free version (M3) has limited function. Besides, MapR is the main ideologist and the main developer of Apache Drill.
These are the most popular hadoop projects. If you know more, please, write in comments.
So lets look at Hadoop stack.
Foundation: Hadoop HDFS
When we talk about Hadoop, first of all, we consider its file system – HDFS (Hadoop Distributed File System). The simplest way of thinking about HDFS is to imagine a common file system, but much bigger. Common FS, basically, consist of file descriptors and the data area table. In HDFS instead of the table a special server is used – a name server (NameNode), and the data are spread all over the data servers (DataNode).
The rest doesn’t differ that much: the data are divided into blocks (usually 64Mb or 128Mb), the name server stores a path for each file, the list of blocks and their replicas. HDFS has classic Unix tree structure of directories, users with a triplet of rights and even a similar number of console commands:
# view root directory: locally and on HDFS
hadoop fs -ls /
# assess directory size
du -sh mydata
hadoop fs -du -s -h mydata
# display the content of all files in the directory
hadoop fs -cat mydata/*
Why is HDFS so cool? First of all, because it is reliable: once while moving the equipment IT department accidentally destroyed 50% of our servers, but only 3% of all data were lost. Secondly, which is even more important, the name server opens the location of data blocks on the machines to anyone. Why this is important, see in the next section.
Hadoop Engines: MapReduce, Spark, Tez
If the Hadoop architecture of the application is correct, one can use the information about the location of data blocks to start calculations (tenderly called ‘workers’) and run most of the calculations locally, that is without using the net. This very idea lies in the basis of MapReduce paradigm and its certain implementation in Hadoop.
Classic configuration of a Hadoop cluster consists of one name server, one MapReduce master (so-called JobTracker) and a number of working stations, each featuring a data server (DataNode) and a worker (TaskTracker).
Each Hadoop MapReduce task consists of two stages:
1. map – is executed simultaneously and (if possible) locally over each data block. Instead of delivering terabytes of data to the program, a small program, chosen by the user, is copied to the data servers and does to them everything which doesn’t require shuffling and moving of the data.
2. reduce – complements map with aggregating operations.
Actually there is one more stage between these stages – combine stage, which does exactly what reduce does, but over the local data blocks. For example, let’s imagine that we have 5 terabytes of mail server logs, which are to be sorted and error messages to be extracted. Lines are independent from each other, that is why their sorting could be done by the map task. Then with the help of combine it is possible to filter error message lines at the level of one server and then with the help of reduce to do the same at the level of all data. Everything that could be parallelize, we do parallelize and also minimize data exchange between the servers. And even if a task will fail for some reason, Hadoop will automatically restart it, recovering intermediate results from the disk. How cool is that?
The problem is that most real-life tasks are much more complicated than one MapReduce task. In most cases we want to run parallel operations, then consecutive operations, then parallel operations again, then combine several data sources and run parallel and consecutive operations again. Standard Hadoop MapReduce is designed to write all data to the disk – both end results and intermediate ones. As the result, the time of reading and writing to the disk, multiplied by the number of times it is done when completing the task, often is several times (not even several, up to 100 times) higher than the time of the calculations themselves.
And here Sparks comes out. Designed by the people from Berkley University, Spark uses the idea of data locality, though it moves most of the calculations to the memory instead of the disk. The key concept in Spark is RDD (resilient distributed dataset) – indicator of lazy distributed data collection. Most operations over RDD don’t result in any calculations, but create a kind of wrapping, promising to run certain operations when needed. However, it is easier to show than to tell. Below you will find a Python script (Spark from the box supports interfaces for Scala, Java and Python) for solving the log task:
sc = … # creating context (SparkContext)
rdd = sc.textFile(“/path/to/server_logs”) # creating data indicator
rdd.map(parse_line) \ # sorting lines and converting them into the convenient format
.filter(contains_error) \ # filtering no error records
.saveAsTextFile(“/path/to/result”) # saving the results to the disk
In this example calculations begin only in the last line: Spark sees that results need to be materialized and starts applying the operation to the data. At the same time there are no intermediate stages – each line is raised to the memory, sorted and checked against error message tag and if the tag is present the line is written to the disk.
This model proved to be so effective and convenient that projects from Hadoop ecosystem started transferring their calculations to Spark and more people are working on this engine, than on morally outdated MapReduce.
But not by Spark alone. Hortonworks company decided to focus on the alternative engine – Tez. Tez presents the task as a directed acyclic graph (DAG) of processing components. The planner starts graph calculations and if necessary it dynamically reconfigures them, optimizing them according to the data. It is a very easy model for making complicated queries to the data, such as SQL-like scripts in Hive, where Tez brings a 100 times acceleration. However, unlike Hive this engine is barely used, that is why it is hard to say if it can be used for simpler and more wide-spread tasks.
SQL hadoop’s stack: Hive, Impala, Shark, Spark SQL, Drill
Despite the fact that Hadoop is a full-fledged platform for developing any applications it is more often used in the contest of storing data and, more specifically, SQL solutions. There is no surprise in this: big data mean analytics and it is easier to run analysis over the tabled data. Besides, it is easier to find both tools and people for SQL data bases than for NoSQL solutions. There are several SQL-oriented tools in Hadoop infrastructure:
Hive is the first and still one of the most popular DBMS on this platform. The query language used here is HiveQL – trimmed SQL dialect which, nonetheless, allows for making quite complicated data queries stored in HDFS. Here we need to draw a distinct line between Hive versions <= 0.12 and the current 0.13 version: like I have said before, in the last version Hive switched from classic MapReduce to the new Tez engine, accelerating it many times and making it usable for interactive analytics. That is, you don’t need to wait 2 minutes to calculate the number of entries in one small partition or 40 minutes to group data according to the days of the week (good-bye long breaks!). Moreover, both Hortonworks and Cloudera provide ODBC drivers, allowing for connecting such tools as Tableau, Micro Strategy and even (God, save us all) Microsoft Excel to Hive.
Impala – product of Cloudera company and the main competitor of Hive. Unlike the latter Impala has never used classic MapReduce and executed queries initially on its own engine (written on C++, quite uncommon for Hadoop). Besides, recently Impala has been actively using cashing of frequently used data blocks and column storing format, which has good influence on analytical queries efficiency. Like for Hive, Cloudera offers quite effective ODBC driver that goes with its product.
Shark. When Spark entered Hadoop ecosystem with its revolutionary ideas, there appeared quite a natural desire to get an SQL engine on its basis. This resulted in the project called Shark, created by enthusiasts. However in Spark 1.0 version the Spark team launched the first version of its own SQL engine – Spark SQL; from that moment Shark is considered to have stopped.
Spark SQL – a new branch of SQL development based on Spark. Frankly speaking, it is not proper to compare it to the previous tool: there is no separate console in Spark SQL or its own metadata storage, SQL parser is still rather weak and partitions are not supported. It seems that at the moment its main aim is to be able to read data from complicated formats (such as Parquet, see below) and express the logic as data models and not as a program code. And, frankly speaking, it is more than enough. Quite often the processing conveyer consists of alternating SQL queries and a program code; Spark SQL allows for connecting these stages painlessly without using any black magic.
Hive on Spark – there is even such a thing, but it is likely to start working only from version 0.14.
Drill. To get the complete picture Apache Drill should also be mentioned. This project is still in ASF incubator and is barely spread, but it seems that the main focus will be made on half-structured and attached data. In Hive and Impala it is possible to work with JSON lines, but query efficiency drops considerably (often by 10-20 times). It is hard to say what one more DBMS based on Hadoop will bring, but let us wait and see.
NoSQL: Hadoop HBase
Despite the popularity of SQL solutions for Hadoop-based analytics, sometimes it is necessary to deal with other problems NoSQL bases are better designed for. Besides, both Hive and Impala better work with big data packages, and reading and writing separate lines almost always means big expenses (remember the size of data block of 64Mb).
And here Hadoop HBase comes to the rescue. HBase is a distributed versioned non-relational DBMS, which efficiently supports random reading and writing. Here we should also mention that tables in HBased are tridimensional (line key, time stamp and qualified column name), that keys are stored sorted out in lexicographic order and so on, but the main feature is that HBase allows for working with separate records in real time. And it is an important addition to Hadoop infrastructure. Imagine, for example, that you need to store user information: their profiles and their event logs. Event log is a classic example of analytical data: actions, that is, events, are recorded once and are never altered. Actions are analyzed in packages and periodically, for example, once a day. And profiles are a completely different thing. Profiles are to be constantly renewed, in real time, too. That is why we use Hive/Impala for event logs and HBase for profiles.
At the same time Hadoop HBase provides reliable storage due to HDFS foundation. Wait a moment, haven’t we just said that random access operations are ineffective in this file system because of the large size of data block? That is correct, and this is the main trick of HBase. In reality, new records are first added to the sorted structure in memory and only upon reaching certain size of this structure they are transferred to the disk. Consistency is achieved through write-ahead-log (WAL), which is written directly to the disk, but doesn’t require the support of sorted keys. You can get further information at Cloudera company blog.
And also you can make HBase table queries directly from Hive and Impala.
Data import: Kafka
Usually data import in Hadoop goes through several stages of the evolution. First, the team decides that common text files will be enough. Everyone knows how to write and read CSV files, there shouldn’t be any problems! Then come unprintable and irregular symbols (what git inserted them!), the problem of line screening and so on, and they need to transfer to binary formats or at least to overexcess JSON. Then two dozens of clients appear (outside or inside) and not everybody is comfortable with sending files to HDFS. At that moment RabbitMQ appears. But it doesn’t stay for too long, because everybody remember suddenly that the rabbit tries to store everything in the memory and there are a lot of data and it is not always possible to retrieve them quickly.
And then someone comes across Apache Kafka – a distributed messenger with high conductivity. Unlike HDFS interface Kafka provides a simple and familiar messenger interface. Unlike RabbitMQ it writes messages to the disk and keeps them there for a configured period of time (for example, two weeks), when you can come and get the data. Kafka is easily scaled and theoretically is able to sustain any data volume.
This beautiful picture crumbles down when you start using the system for real. The first thing you need to remember when you deal with Kafka is that everybody lies. Especially documentation. Especially official one. If the authors write “we support X”, often it means “we would like to be able to support X” or “in the future version we are planning to support X”. If they write: “the server guarantees Y”, most likely it means “the server guarantees Y, but only for Z client”. There were some cases when the documentation stated one thing, the commentary for the function stated another thing and it was the third thing in the code.
Kafka changes the main interfaces even in minor versions and it hasn’t been able to switch from 0.8.x to 0.9 for a long time. The original code itself, both in its structure and in its style, has obviously been written under the influence of the famous writer who has given the name to this monster.
But despite all these problems Kafka still remains the only project which is able to solve the problem of big amounts of data at the architectural level. Thus, if you ever decide to deal with this system, keep the following things in mind:
Kafka doesn’t lie about reliability – if the messages reach the server, they will remain there for the indicated period of time; if the data is missing, check your code;
consumer groups don’t work: despite any configuration, all messages from the partition will be given to all connected consumers;
the server doesn’t store offsets for users; server in its essence can’t identify connected users.
The simple recipe we’ve gradually come to consists of launching one user for each partition form the queue (topic, in Kafka’s terminology) and manually controling the offsets.
Stream processing: Spark Streaming
If you have reached this section, then you are probably interested. And if you are interested, then you have probably heard about lambda architecture, but I will review it just in case. Lambda architecture assumes duplication of calculation conveyer for package and stream data processing. Package processing is launched periodically over the previous period (for example, over yesterday) and uses the most complete and precise data. Stream processing, on the contrary, makes calculations in real-time, but doesn’t guarantee precision. This can be useful, for example, if you launch a campaign and want to monitor its efficiency every hour. Even a one day delay is unacceptable, but losing a couple of per cents of events isn’t critical.
Spark Streaming is responsible for stream data processing in Hadoop ecosystem. Streaming from the box can accept data from Kafka, ZeroMQ, Socket, Twitter and other. Developer gets a convenient interface in the shape of DStream – in its essence it is collections of small RDD, collected from the stream over a limited time period (for example, over 30 seconds or 5 minutes). All useful features of regular RDD are retained.
The picture above precisely expresses the state of many companies: everybody knows that big data is a good thing, but there are few who understand what to do with them. And there are two things to be done with them – turn them into knowledge (meaning: use when making decisions) and improve algorithms. Analytics tools helping with the former and the latter comes down to machine learning. Hadoop has two large projects for this:
Mahout – the first big library which has implemented many popular algorithms by using Hadoop’s MapReduce. It includes algorithms for clusterisation, collaborative filtering, random trees, and also a few primitives for matrix factorization. At the start of this year the organizers made a decision to transfer everything to the computing core Apache Spark, which is much better at supporting iterative algorithms (try running 30 iterations of gradient descent through the disk by using standard MapRedice!).
MLib. Unlike Mahout which tries to transfer its algorithm to the new core, MLib is initially a subproject of Spark. It includes: basic statistics, linear and logistic regression, SVM, k-means, SVD and PCA, and also such optimization primitives as SGD and L-BFGS. Scala interface uses Breeze for linear algebra, Python interface uses NumPy. The project is being actively developed and with each release grows in its functions.
Data formats: Parquet, ORC, Thrift, Avro
If you decide to use Hadoop at its fullest, you might find it useful to learn about the main formats for storing and transferring the data.
Parquet – column format optimized for storing complicated structures and effective compression. It was originally developed in Twitter and is now one of the main formats in Hadoop infrastructure (in particular it is actively supported by Spark and Impala).
ORC – new optimized data storage format for Hive. Here we once again can witness the opposition of Cloudera with Impala and Parquet against Hortonworks with Hive and ORC. It is most interesting to read about the comparison of the solution efficiency: at Cloudera blog Impala always wins and in Hortonworks, as it is easy to guess, Hive is always the winner.
Thift – effective, but not very convenient binary format of data transfer. Working with this format means defining data schemes and generating appropriate client code through using the right language, which is not always possible. Recently it is being dismissed, but many services still use it.
Avro – is mainly presented as the replacement for Thrift: it doesn’t require code generation, can transfer the scheme together with the data or just work with dynamically typified objects.
Other: ZooKeeper, Hue, Flume, Sqoop, Oozie, Azkaban
And, finally, let’s talk a bit about other useful and useless projects.
ZooKeeper – the main instrument for coordination of all elements of Hadoop infrastructure. Most often it is used as a configuration service, though it has wider possibilities. Simple, convenient, reliable.
Hue – web interface for Hadoop services, part of Cloudera Manager. Works poorly, with errors and is very moody. It can be used for demonstration to non-technical specialists, but it is better to use console counterparts for serious work.
Flume – service for data streams organization. For example, it can be setup to receive messages from syslog, aggregation and automatic damping into HDFS directory. Unfortunately, it requires a lot of manual configuration of streams and constant extensions with Java classes.
Sqoop – utility for quick data copying between Hadoop and RDBMS. Theoretically quick. In reality Sqoop 1 proved to be single-streamed and slow, and Sqoop 2 didn’t even start working during the last test.
Oozie – task stream planner. Initially designed for unification of separate MapReduce tasks into a single conveyer and launching it according to the schedule. It can additionally execute Hive, Java and console actions, but in the context of Spark, Impala and other, this list seems quite useless. Very fragile, complicated and almost undebuggable.
Azkaban – quite capable replacement of Oozie. It is a part of the Hadoop infrastructure in LinkedIn company. It supports sever types of actions, the main of which is console command (what else do you need), scheduled launching, application logs, notification about failed tasks and other. Disadvantages list a bit of rawness and not always understandable interface (try to guess on your own that you need to give tasks not through UI but upload them through zip archive with text files).
I hope this hadoop overview was helpful.