Just the Basics for Rookies
This session assumes absolutely no knowledge of Apache Hadoop and will provide a complete introduction to all the major aspects of the Hadoop ecosystem of projects and tools. If you are looking to get up to speed on Hadoop, trying to work out what all the Big Data fuss is about, or just interested in brushing up your understanding of MapReduce, then this is the session for you. We will cover all the basics with detailed discussion about HDFS, MapReduce, YARN (MRv2) and a broad overview of the Hadoop ecosystem including Hive, Pig, HBase, ZooKeeper and more.
Adam: I don’t believe these slides were included in the USB because I didn’t give them to anyone. My punishment is providing all of my beautiful emails email@example.com if you email me out I will out them out on dropbox or something and trade them up with you. Or if you have any other Hadoop related questions please feel free to send me an email I am usually pretty good about answering them.
I have been working with Hadoop for probably around three years now kind of kind of got started on a production Hadoop developing analytics and working with a lot of different Hadoop technologies specifically like HDFS, MapReduce, Pig, ZooKeeper, HBase email things like that. Today I am going to be going over Hadoop, just the basics for big data rookies, which is going to include just kind of a brief Hadoop over view, what it is, kind of where it came from pieces, why all of you should care.
There is all of you in this room I hope that you all have a good idea of why you should care. We are also going to talk about the HDFS architecture and how that functions, the Hadoop MapReduce the architecture of that system, I am going to go into the Hadoop ecosystem kind of a lot of the open source projects and other project out there to kind of augment the Hadoop core of the file system in MapReduce and finally kind of a MapReduce primary to walk through just kind of a job or code example of word count is typically the hallow world of Hadoop.
Also discussing the individual MapReduce components that one would typically develop for that with that said let’s buckle up. I got about 70 slides of material and 90 minutes to do it. If you all have any questions feel free to raise your hand and pipe up, write it down shoot me an email during the session, I won’t answer it but afterwards I’ll answer it. With that said Hadoop overview, the Hadoop core projects, it’s an open source apache project, mostly written in java, it has some native libraries underneath it.
Came out of yahoo in about 2006, started by Doug Cutting kind of the Hadoop overload I should say. At its case it is a distributed fall tolerant, data storage and batch processing system. It’s all about really, really huge datasets, strong big data, it was built to kind of deal with the whole new web page, how do I manage and store these massive files and how do I process these massive files. It was built off of two papers from Google that came out in 2003 and 2004 or 2004/2005 basically on the Google file system and Google MapReduce.
How are they storing index in their giant and gigantic data sets? At the time they were developing those project called [inaudible 00:03:06] they were looking at how do we store and process all these data and they came across these papers and are like sweet let’s develop an open source mat to the community and it blew up into this massive project that we have today. It provides linear scalability on commodity hardware commodity hardware does not mean cheap hardware.
It just means cheaper hardware you don’t need any specialized systems to run these things typically just Linux box, JBM and you are pretty much good to go. Some discs and some RAM and if you need more data storage and you need more processing powers simply buy another rack, buy more serves and slap it on the system and keep going. It’s been adopted by many of the years, Amazon, [inaudible 00:03:39] Facebook, Google even uses it and many, many more people have been picking up this technology over time.
For why, the bottom line of why the system exists and why people are really getting into and learning it is extremely flexible system because it is a file system at its core, it’s extremely scalable and most importantly it is very inexpensive. The software is cheap, free if you go and download it from apache and the servers that actually run the software are relatively cheap. Hadoop is really great at reliable storage for multi petabytes size data sets.
I believe the largest system now is around 45 petabytes of compressed data, somewhere around I think 4,000 nodes is the like a soft limit for Hadoop, really, really big systems we are talking here. It’s great at batch cruise and analytics, these are your deep processing, your indexes, your hourly jobs that you are going to be running, it’s not an interactive types system for executing this, you are processing petabytes of data, terabytes, gigabytes, it’s going to take some time to get results back.
What we are looking at really kind of batch processing here, because it is a file system it is really good for complex hierarchical data with very often changing schemers as you can write you applications to be aware of changing schemers and be schemer aware, supporting un structured, structured data inside the file system whether it’d be images, videos, flat files that look like tables, whatever you can store in a file you can store on Hadoop.
The problem comes into actually processing that data which we’ll talk about later. It’s not so great at changes to files in that you can’t do it. Hadoop is an append only file system, you cannot go and flip some files inside this file in HDFS. You can only add data to it. It’s also not very great at low latency responses I was talking about earlier, this is a batch system don’t expect it to come back in milliseconds or seconds [inaudible 00:05:48].
Also analysts usability, this piece really just means a lot of DVAs, databases analysts are used to speaking sequel and until very recently there was really no way to do that, you needed to develop custom applications, custom java codes, use higher level APIs to work with Hadoop. Overall specialists weren't available to say the enterprise, now with tools like Pivotal Hawq and Cloudera Impala, high stinger which we will get into a little bit later.
That is kind of breaking down the barriers now and letting people that speak sequel and don’t necessarily speak Java can actually use Hadoop. What is actually stored on HDFS, it stores bytes, there is no ETL necessary or acquired for this system, data lake is a term that is being thrown around now that you take all your data that you ever have and you throw it into this file system. You don’t transform it on the way in, you don’t touch it, you just throw it in there and you define that structure of the data as you read it.
This is nice with the changing schemer pieces, if you have new fields that come in, your applications can be aware that hey I am going to see new fields, let me read the structure from these bytes. Has built in support for a very common data type so they are like strings in flanges, byte arrays as well as formats will do that just like a simple text file format read one line per record or say like Cassandra and put data formats from rain data from Cassandra, Avro formats, [inaudible 00:07:23] XML formats.
A lot of common data types to read any data that you put into Hadoop. The structure is extendable for top layer when you get into the kind of component pieces f Hadoop and also flexible of course with the structural pieces. Some versioning information, they were in a transformation switch between the first version of Hadoop and the second one, version zero that 20, 21, 22 morphed into the Hadoop 1.0 line and with that from a programming perspective there is two main MapReduce packages.
Hadoop.MapRed which is deprecated and then there is Hadoop.MapReduce. Some examples we might come across with working in the blogosphere might be using the sole deprecated package but there is less common now since everything is being updated to the new deal. Version two offered in May 2012 this comes with a nay note high billability [inaudible 00:08:21] that I’ll talk about as well as the YARN just kind of the next generation of MapReduce, the beta actually just came out a couple of weeks ago very, very new technology.
Not even stable yet I should say but we are using it anyway. HDFS architecture piece, what it looks like to users and clients accessing HDFS it looks like the UNIX like file system per data storage, it supports Linux lens, LX, make directories, cuts, pale like different pieces of a UNIX type systems. However underneath the hood is actually taking that data, splitting these large files into smaller chunks of data, 64 megabytes is the default.
128 in turn to 56 are also very common sizes depending on how big your files are. It takes these blocks and distributes them among a number of servers and then replicates these blocks, there is two key services, there is a master NameNode a single service that is basically a giant name server, kind of knowing where these blocks are inside the system and there are many, many data nods up to 4,000 as I was saying before that actually restore the data.
There is another service on the aside called the check point node, it used to be called the secondary NameNode but that name has since been ditched because people were starting to think that it is some sort of hard backup it’s not just as check points of the file system. The NameNode itself is the single master server it was the single point of failure in the hadoop 1.0 line if your NameNode crashes you can no longer access your data in the system until you bring it back up.
With Hadoop HDFS 2.0 we have NameNode high billability that takes care of that single point of failure problem. It stores this file to block to data node locations inside the name space which is stored in memory essentially just kind of like a triple map there. All the transactions that work with it actually log to disk ion a simple log file, so whenever someone makes a file or read the file or creates a directory or deletes a directory all of the transactions are logged to disk.
If a NameNode were to fail it starts up and there is kind of like a name space image on there. It will put that into memory and then we’ll start going through this log that it has basically been modifying its name space over time until it comes back as read logs, transaction logs and it’s safe to actually use HDFS at that point. The NameNode stores all these pieces in memory in the event that it runs out of memory it will then start spilling to disk at which point it becomes very, very slow to use, just not good.
Typically NameNode service in a production system has its own stand alone server and it is given all the memory on the box. Checkpoint node performs in check point of the NameNode and logs it is not a hub backup, if your NameNode fails then your checkpoint node will do nothing. You can schedule it periodically say everyday it will load up the most recent checkpoint, the name space piece, much how NameNode will load that up into memory, it will read through the log transactions and modify this name space and then save this name space back to disk as a check point.
This is useful so that your NameNode transaction log had been running for months doesn’t get massive, you can take daily checkpoint, you can back up these checkpoints as needed, put them on in [inaudible 00:11:55] share anything like that. these are all ways that they tackled solving the single point of failure from way back when the checkpoint node is still in the HDFS has 2.0 and just obviously take [inaudible 00:12:07] over time but no longer single point of failure.
The data node service it stores this block on the local disk that you configured for that system that’s all it does. It responds through requests from clients for any input and output relations for writing data and reading data. For management side it sends frequent heartbeats to the NameNode which just pings them and says hey I am alive, that is about every two seconds. It also sends block reports to the NameNode it says here are all the blocks that I am processing.
The NameNode is keeping track of the system and what blocks each data node has, in the event that a data node were to fail it knows all these blocks resided on that data node and I need to replicate all of these data.
Male: [Inaudible 00:12:57].
Adam: HDFS handles all that behind the hood, under the hoods, each actual file you look at a data node’s data directory its blk_ some number.
Male: [Inaudible 00:13:14].
Adam: How do you identify them?
Male: [Inaudible 00:13:18].
Adam: The NameNode is storing, you out a file to say like my data in there, my data NameNode is storing that and it chunks that up into blocks which are some files and those blocks are stored on data nodes. When you want access to that file you say I want to read my file called my data and the clients will make a request to the NameNode it will say where is this file? And it will come back and say they are these blocks, I have a little diagram you will see this later.
I just remembered. Data node also does checks on some files, if there is any data run on your disk it will make sure that those [inaudible 00:13:58] those blocks are safe and it will delete blocks. It is very fail tolerant data storage system. How it works when it comes to writes of a client here that is going to be in NameNode and for data node servers. The step one is the client is going to contact the NameNode and say hey I want to write some at it is called this file and the NameNode will respond and say okay, write it out to these nodes.
At which point the client is going to sequentially start writing blocks out here, I have a file called a, they are essential split into four blocks say 256 megabytes file split into four, 64 megabytes chunks at which point the client then connects the data anodes and writes a block out. Then it will connect to the next data node as orchestrated by the NameNode and said where to put all these blocks and it will right out that next block.
Finally after the replication is done which I have a slide on later for that, the data nodes replicate all these data in between. The client is only writing there file once and essentially making two more copies inside HDFS that’s all distributed and replicated and blockfied put there. All this process is orchestrated by the name node, name nodes don’t really have any responsibility expect for copying blocks of other data nodes hence serving request from clients.
In the event of an actual node failure say if the data node d goes down, because these blocks are replicated in the file system a client can reconnect to say a1 or a2 or a3 because that block is somewhere else on the system. The NameNode will also recognize that hey this data node crashed at which point it will start replicating all the blocks in that node out to the other system. It’s about ten minutes but that threshold is configurable so you can restart data nodes if you want but [inaudible 00:15:55].
For the reading part of it the client again is going to contact the NameNode and say hey I want to read some data the name node responds and says you can find all these data here, that file that you wanted to read out can be found n all these data nodes and these are all the blocks that it consists of. Which point the client can then sequentially read blocks from these data nodes, it reaches out to say data node B to read the first block.
Then once that block is fully read it will reach out to data node c to read that second block and once that one is read it will reach out to data node d and it goes on and so forth. The way that these clients are built and implemented they are all intended to be as data local as possible. When it receives that block report from the NameNode it looks at all these blocks and say, okay this block is on my local host so I can just read it.
It used to still go through the http loop back but they’ve recently out in a feature that will just read that block straight from the local file system. If the blocks are on the same rack it will reach out to a different node on that rack if it is on a different rack it will reach out there. By the way that HDFS distributes data, a lot of clients can access thus locally.
Male: Is it possible for a client to [inaudible 00:17:15]?
Adam: As far as the same block is concerned, it won’t read from both simultaneously if that is what you are asking, it will connect to one and read as much data as it can and if that were top fail, the clients will basically reconnect, in this figure the, say the third block there disconnected, it will pick up from another data to make another connection from that since it has that block mapping.
Male: What if I can get full set of redundant locations [inaudible 00:17:58]?
Adam: It uses all of that information and it will reconnect to the data, [inaudible 00:18:04] block isn’t there for some reason it understands that, the clients will make a request back to the name node is needed but that report it keeps.
Male: [Inaudible 00:18:18].
Adam: Yes, it’s a lot of Meta data coming back. How the block replication works is a default to two replicas anymore than that it will just out them anywhere as we will see soon. Two replicas is also pretty common just for smaller development systems is usually stored just pretty short on there and if you have a five node system or a three node system, three copies on the three node system you are basically just copying your data to each node.
Keep that in mind, it is a configure of parameter, pretty much everything in the system is configurable, you can configure Hadoop to be a rack aware system, it will understand that these servers rely on this rack inside your data centers. If you have a client as symbolized by this smiley face, do you like my smiley face, okay? On this side of this data node when it goes out to write a block it will write one block on the same rack.
That first block of the file is actually written to that exact data node but all the other block can essentially go anywhere else on the rack. Here it’s writing data node on that same system that it is running, if a client is not running on the data nodes service it just writes it, just picks a rack essentially. At which point then this block after its finished writing, the data node will connect to the other data node service and copy that block over and again orchestrated by the NameNode it says these blocks go here.
Then after that is copies, this data node then will copy that block to another rack. It’s going to store two copies on another rack and one on a separate rack. This is really useful in the event that someone trips over your wire in your data center and your rack goes down because it has blocks on other racks that it can automatically recopy and distribute this data from.
If you don’t configure Hadoop plus to be rack aware system it will just put three copies wherever at which point if a rack goes down then you are going to possibly have some data loss and most likely you will. Features of HDFS 2.0 is the NameNode high availability and with this there are two redundant NameNode services that you can start up and either in an active passive configuration, you have an active NameNode and your data nodes are aware of both NameNodes and they basically they send block reports out to both data nodes.
They use both the NameNodes as they are except one is just considered active, I’ll like the various, I’ll say the right request go though that but the reading stuff it will use both NameNodes. There is either manual or automated fail over, you can do manual wear if you want to take a server down for maintenance for the NameNode you can as adman say switch over to that and in instant that passive NameNode will become the new NameNode and all the data nodes will note that.
They will go through that NameNode for all the connections at which point you can do whatever maintenance you need and bring the other ruin back online. Or it’s an automated fail over through ZooKeeper which I have a little bit on later, where it will basically detract if the NameNode fails at which point it will be an automated fail over to that system. This is done through the quorum general manager.
I lift it out here that is kind of a tool that is used to the transaction pieces and redundancy between the two NameNodes. The other big feature that came out NameNode federation, you can have multiple independent NameNode that are used in the same collection of data nodes. For a new multi tendon system, if you have a huge but you kind of want to separate, this group has this Hadoop cluster and this one has that Hadoop cluster.
You can stand up a NameNode per group essentially and set it and secure it so that only this group can access this NameNode and even though they are all sharing the same data nodes, the actual view that you see through that name is totally separate. That’s HDFS in a nutshell I am going to go into the MapReduce components now, Hadoop MapReduce .x.
Male: [Inaudible 00:22:54].
Adam: Whatever you want, HDFS has Linux type users in groups and you can secure it using [inaudible 00:23:05] or other LDAP authentication. You can stand up a NameNode and basically say these users that are consisted of these group, they can grow through LDAP and talk to the specific NameNode or another one. It’s not one per user you can set it up that way if you want but however you want to. Hadoop MapReduce we are going to be talking about the first version of this and the architecture pieces.
The whole design behind this is, I am storing my data, it’s distributed and replicated across my cluster, how do I go about processing that in a nice parallel way? From traditional applications you move your data to your processing application and do it there, MapReduce is a whole new paradigm in which we are going to tape record our application in a jar file and we are going to ship it out to all these nodes which are going to process the data locally on that system that is out there.
There are two main services, only two services and the job tracker which is a master service which monitors and schedules all the jobs that are submitted by clients for the system. The other piece is a task tracker which is another multiple service that runs tasks. These task trackers are ruining on the same physical machines that the data node processes are running in order to get that data locality. MapReduce job contains many tasks, the number of tasks is equivalent to the number of blocks when you are dealing with HDFS input, the default.
That file we had earlier that was 256 out into four pieces you would get four mapped tasks to read all of that data in parallel. Each task has one or more task attempts to actually process that data. If a task were to fail whether that’d be an arraigned out of bounce exception I get often or an all pointer exception or something weird happens on the machine and the task were to fail. The job tracker will receive notification of that error and will start that task over again as another task attempt.
By default four task attempts fail that belong to the same task your job fails as a whole. You can have thousands of failed task attempts as long as it is not for or from the same actual task number. The job tracker monitors your overall job and task progress, it keeps metrics about the jobs, bytes in, bytes out, number of records process number of groups processed, any custom counter counties you developed in your application or stored on the job tracker, when I receive job requests from clients I take that code.
Look at the input files that you specified there, I want to process myfile.txt. At that point it works with the NameNode and just like any other client says hey where are all these data blocks at and the MapReduce code will look at that, it will get that block report essentially from where these blocks are in the cluster and will start issuing tasks attempts to task records that are running ion that same node where that block is.
Four retry and failed attempts, four failed attempts is one failed job, the default scheduler is a FIFO scheduler that ships with it which is funny because they tell you not to use that one and use the fair scheduler or a capacity scheduler. The problem with the FIFO order is that a very huge job is submitted it runs a lot of data, if you have other jobs that are coming in that aren’t processing as much data it might finish very quickly.
That huge job is basically going to block all other processing until it’s finished. The Fair scheduler balances the distribution of task across multiple jobs, overall everything else takes longer but those quick jobs will finish must faster rather than blocking by the scheduling I guess.
Male: [Inaudible 00:27:07]
Adam: Not now, not this one, in the YARN frame, the YARN framework we are working n developing that so we know more about the resource allocation, the kind of piece that ties in into a little bit of that which you can pay around with it in the task record side, there is a configurable number of map and reduce slots, each task records you can say, you can run format task and you can run for reduce tasks or any combination of that.
The usual rule of thumb is one task per physical [inaudible 00:27:45] of the machine. If you have more powerful machines that have say four sockets rather than two you can configure that specific task record to run more processing power on it. It doesn’t do that automatically unfortunately. Task records, they always send heart bits and task reports to the job tracker and it runs map and reduce task attempts there.
It runs all these in a separate JVM from the actual task record, if something catastrophic happens it won’t take the task records along with it just that specific JVM.
Male: [Inaudible 00:28:24].
Adam: The data nodes of JVM, the task records of JVM and all the tasks that are running have separate JVMs as well. There is configuration to essentially recycle JVMs, the default is one per task, if task records receives another map task say for the same job it will recycle it using that same JVM. I’ll talk a little bit about exploiting data locality this job tracker will schedule a task on a task tracker that is local to the block.
With the three time data replication and the job tracker basically has three choices to choose from at that point. If those task trackers, those three that can process that data are too busy it will pick another task tracker on the same rack of those three. You have two blocks on one rack and one on another track you basically have two racks of task records to choose from which there are lots of options there to run it. For some reason all of those task trackers are too busy it will just take another free task record at random and run it there.
Which is rare but it happens especially for a busy cluster that are running in capacity all the time and with the metrics that print out at the end of your job it will say this is how many data local tasks I have, this is how many record local tasks I have and things like that. How it worked overall, you’ll have a client that will submit a job to the job tracker and inside of that it contains my input path which can be files, folders, wild coded paths as well as kind of like input formats, map with key valued pairs which we'll talk about soon.
Any custom code that you've written basically your application then gets submitted to the job tracker. At which points it will figure out where those blocks are and it will submitting tasks over to task trackers. Here is this file A that we are processing, this is one note of data note A task tracker that we are running. All these tasks are going to be run and all the output then is going to be split back or pit out to HDFS with again that will replicated and running again. Kind of each of these task trackers when it's writing output HDFS acts as a client essentially.
Whenever I send out it will get that local block, one in the same rack and one in another rack as his. Once the job completes successfully it will report back metrics. You don’t actually get your output like maybe you are still using the sequel type prompt. It will just say my job is finished at which point you can go fetch your data, you can export it to another system and you can do whatever it is you want with it after you've processed it. In the event that a note were to fail the job tracker will receive a notification of that and we'll just assign to another task tracker.
Much out of HDFS is very full tolerant and replication of the data. The replication is also seen here in replicating actual; processes and tasks.
Male: Does the client have to wait for it, execute then finish or just trust, submit and then come back later?
Adam: Both, you can choose. When you submit your job the question was can you, does the client just submit and lock and wait or can you just submit and keep processing? There are two ways to do that, you can either just submit the job and the application or continue running, you can do whatever. You will get a job object that you can check status of, periodically pull counters out of things like that or you can just stay away for completion, gets job that way for completion then it will halt in that application until it comes back.
YARN which is yet another resource negotiator is, it kind of took looking at amp producing and what it is at its core. They wanted to add another on top of that to make just a general framework for a distributed big data or other types of applications. Looking at the job tracker there are two main pieces of it. One is the resource manager piece of how am I going to schedule and distribute my application into my cluster.
The other piece, the actual application master which is the application monitoring piece … let's split them up in the two pieces and convert the task tracker piece into a note manager. One of the complaints with the task tracker is you have some considerable number of MapReduce slots, you could have all of your map slots filled and none of your reduced slots filled and that's wasted compute power that's happening, they are just sitting there.
With the note manager they use a notion of the container and a container is a piece of a computer power essentially. Right now it's just how much memory do you need? Each note manager has a configurable amount of memory that you want to give for processing power and with that each container has a default limit. Say 8 gigs for processing and I want one gig container. This note manager can run eight processes essentially.
When a client requests a piece of computer power, a container it sends a long way that this is how much memory I'm going to need. You are going to monitor that and make sure you are using that much memory. As we develop the YARN framework they are going to put pieces in like how much CPU power do I need, how much disk do you need? They are going to use like a full research manager framework for distributing the applications.
MapReduce 2.0 on YARN, the map is API hasn't changed, all you need to do is to develop your MapReduce job that you have developed against the one dotted line, you just change it to YARN and rebuild and that's good. I actually think I read in the data release of 2.0 you don't need to do that anymore but I haven't touched that, you can just run your old code on YARN. The application master piece is per application so MapReduce has a MapReduce application master.
HBase is also in the YARN framework now and it has an HBase application master and the MapReduce is essentially responsible for launching this job. It needs to figure out where I'm I going to put tasks. I need a request containers from the resource manager of YARN to distribute all my tasks. There is also another process called MapReduce history server which is just a web server that is used to store the history pieces of it.
All this kind of job tracking history pieces isn’t in the YARN framework. It has that for applications and a job is a type of application inside of that. Whenever you submit a job in MapReduce you are basically spinning up an application master for a job. Before I go into the actual, like what does MapReduce looks like from a developers stand point I kind of want to go over the actual eco system. There are two core technologies that came out of this, when people are talking about Hadoop core they are talking about the distributed file system in MapReduce.
There are many, many other tools that have been cropping up over the years to kind of counteract different pieces, limitations of HDFS and MapReduce which I'll be going into now. We are talking about moving data. I have my Hadoop cluster, how do I migrate stuff of my old sequel systems, how do I get data into it in a nice clean way? There is a project called Scoop which is for moving data between a number of relational database systems and HDFS both in and out.
You can schedule Scoop jobs and say I want to move this table into Hadoop and it will go off and of everything it needs to do to take that data and put it on the HDFS. If you want to migrate some sequel table into there that is the project to use. Flume is another project which is all about kind of streaming event data from sources to synchs. Very abstract, what is my source whether that be a web server or [inaudible 00:36:32] or a twitter stream whatever that is and what's my synch, where I'm I going to put that data. Is it a database, is it HDFS and is it my local file system whatever you want.
Male: [Inaudible 00:36:45]
Adam: You can either do the oracle files or they can just be flat files, you can I guess pick which ones that you want it to be.
Male: Basically I need to [inaudible 00:37:02]
Adam: Yeah, right like pulling a table out you can just store it as a flat file and then process it line by lien using MapReduce. The Flume architecture, there's three main components, a source you are talking about, a channel for how I'm I going to move data from my source to my synch and an actual synch in this piece. Those are a number of sources that I have developed and shipped with Flume, you can develop your own if you feel like it's pretty easy.
The original thing was like moving log files from web servers. You can write Flume agents that will sit on your web servers that will basically look at log files and they can tail them and it will push them through a channel and the channel can either be like a local file system so kind of more reliable system. It will write them out there or you can just use a memory channel, that's pretty common and then a synch of where I'm I going to put that HDFS is a good example.
I wrote recently a Flume agent that would pull from the twitter streaming API and write to [inaudible 00:38:08] database using JDBC connection. It was God awful slow but it did so I'm working on that. Kind of a higher level APIs there are two main ones Pig which came out of Yahoo a little way back and Hive which was originally developed by Facebook and [inaudible 00:38:31] kind of spearing the development of that project.
Both are open source Apache projects. Pig is described as a data flow language which they have called Pig Latin. The show that you interacted with Pig is called the grunt, the eclipse plug in is called Pig pen. There are a lot of various Pig related items its great. Pig you express, I'm using this data flow language, it will take that Pig script and will generate one on one MapReduce jobs that are just jobs of MapReduce inside their Pig framework that will execute on your files on HDFS.
It's really great for prototyping or I mean, if you have like hundreds of things to do per se, like you don’t to spend all day writing MapReduce codes, just like root-by, sort and join on this. Pig has all that framework in there to use a lot of the sequel syntax we are all used. Hive is a data warehousing solution. You can write a sequel like queries that will also generate a series of MapReduce jobs.
About this projects started around the same time and they realized hey, I'm copy pasting a lot of my MapReduce codes to do the same things over again so why not write a higher level language of that? The word count example for Pig just to give you a little glimpse of it, over here on the left are the code relations so it's basically a variable you are looking at. I want to load my inputs so dollar sign are variables and Pigs that you can use and specify in the command line.
I'm going to load input and I didn't specify scheming here so I'm just going to load one line per record. Next year for each line in A I'm going to generate something. Dollar sign zero is that first index, that's the line. You can use position rotation and I'm going to tokenize it. Tokenize is a function that comes with Pig that will basically take your input and will split on light space, commas and like underscores and a few other things.
Flatten is a special operator that will basically take that tokenize string of all of my words and my line and will kind of pivot it on my side into another piece. We call this word. B is essentially a massive list of words inside my document that I've created. Next I'm going to group this giant list of words by word. This creates a MapReduce job where the key which you'll see later is the word and the value is essentially the word.
It is building this group, it is essentially the … and then a giant list of those and Adam and a giant list of Adams and things like that. The last line here, for each one of this we are going to generate two columns. One of which, so this group our operator creates a thing called group which is what you grouped by in this sense its word and I'm going to round to this UDF of count be.
This will basically iterate through that list of words and count how many things are inside of that bag. Then I'm going to store D into an output directory. There is a store command which will at that point execute your MapReduce job, submit it to the YARN framework or job tracker depending on what you're using. We will run all that processing and we will dump stuff out to HDFS. There is also a dump command which we'll just write it to standard out for kind of development here.
A Pig supporting operator is like distinct and joins both left outer cetacean products like there is a lot of tools here for actually writing map produced jobs. Using a higher level language like this will get you say 80% there for all the MapReduce processes you need.
Male: Question, if you [inaudible 00:42:11]
Adam: The question is create multiple outputs in the series of it. For each step in the process, Pig has optimizers so it will do what it can to produce the number of MapReduce jobs overall. For each MapReduce job its writing the output right back to HDFS. This is for Pig it's like temp directory of Hadoop so it's writing out that data and then that data is going to be used as input into the next job. It is the same way that you kind of change opts together using MapReduce itself.
Male: Yeah but that part I was asking you is based on measures of something [inaudible 00:43:09]?
Adam: As far as like output?
Adam: Not really I guess you need to know your data and processing that you're doing. When it comes to tuning jobs like I would typically just run it and see like how much is input, how many tasks are created, how much output is it creating and then you can play with the parameters to make the best of it. Pig will as part of its optimization framework it will actually guess what the output is and use some of that in its optimization piece of it but that's magic underneath the hood.
Male: [Inaudible 00:43:51]
Adam: Pig you can just run it from whatever client that has access to you job tracker.
Male: [Inaudible 00:44:01]
Adam: Yes, Pig is just a drop file, I don't know if they actually install anything you just unpack it and put it on your path. When you start writing stuff like JBM that's running Pig your client essentially is kind of compiling your MapReduce code and it will build a Jar file and submit it. It is basically the same process that you would do if you just build a MapReduce just juggle client as a developer except you are expressing it through this higher level language.
Clients need access to the job trackers essentially. Typically as far as like codes where layout is concerned you would have a few nodes in your cluster for actually laid job submissions rather than kind of submitting it out of the cluster but that's up top you all. For key value stores there are two out now, HBase and [inaudible 00:44:57]. HBase was the first one that came around. They are both implementations of Google's big table paper that came out. It provides random real time access to your big data and supports updates and deletes service.
This solves the problem of how do I change my data inside HDFS. The actual architecture of this a little bit cut off there. ZooKeeper is there, I'll talk a little bit about what ZooKeeper is but there is an HBase master service that's running. It's kind of like a name node in that it's a single master service that knows about these region servers that are inside the cluster. These region servers are responsible for hosting regions and regions are much like a block is to a file in HDFS.
Then you have a store which is basically a chunk of the region let's say and as of the store there is a mem store which is all the in memory pieces of that. Then there are these things called store files. Whenever that memory gets full essentially it will write a store file out to HDFS. There is this in memory layer sitting on top of HDFS that responds to client requests. When you want to say fetch your record out of it the clients … a lot of the mem data stored in ZooKeeper so clients don’t actually have to communicate with the master.
The client can figure out from ZooKeeper, get the mem data for a fetch operation. The clients will ten connect to a region server that is hosting that piece of data. These are, logically it looks like a giant sorted list by key so by knowing that this key and this key is the start of this region. The clients know that hey I'm going to fetch this key. It fits here in this sorted list so I need to go talk to this region server to fetch that data.
The region server at that point will check its mem sore and say hey, do I already have this key there? If it's not then it will open up that file in HDFS and pull the data out of it. There is a lot of data moving in and out between the region servers but it supports that to kind of update and deletes. When you put a delete record in there it's not actually deleting anything. Inside that mem store it knows hey I deleted this key value pair.
When a client goes to fetch that record it checks its mem store and says hey I deleted that key value pair. There are a number of compaction schemes that run for these store feels. A lot of this for updates and deletes they are written out of store files and when it runs a compaction it basically drops all the data kind of like a vacuum piece. That's HBase in a nutshell and in [inaudible 00:47:47] architecture is based similar to this because they are both based off a big table which looks a lot like this. There's a whole lot more into HBase that I don't want to get into. Yes, a question?
Male: [Inaudible 00:47:59]
Adam: The question is if they use cases for HBase where does this fit in. HBase and [inaudible 00:48:19] are optimized for a very fast fetches of pieces of data. It give you relatively access to it so where MapReduce a batch processing system. If you have an application where you essentially have like a massive table, billions of rows, millions of columns but you just need to fetch this one piece of data out of it, in order to do that in MapReduce you have to read every single bit inside of it.
If you just want to get that quickly this is where HBase system is because it is a giant sorted list of key value pairs, it knows this key I'm looking for exists in this region so I can go directly to that region to fetch that data. It has a number of optimization pieces of it in like block cache. When you fetch a key it will actually pull a number of keys into memory. If you are looking for keys that are near that key sort wise it already has that stuff in memory.
Facebook actually uses it to back their entire messaging system which I think is pretty cool. They have a really cool architecture, you can look it up. For data structure pieces Avro is the data serialization system that's specifically designed for Hadoop. They are kind of trying to solve the, what are common data format for this system, yes?
Male: [Inaudible 00:49:38]
Adam: The indexing is an H file as its stored so that actual block keeps not that much, like it's not stored in that whole block but for 64 meds it's a few k that it will keep … it says this key is at this bit offset and this file. It also has bloom filters if you are familiar with those to put on top of this. It will check its bloom filter which is in memory before it bothers going to the file for any false negatives, things like that.
Avro is expressed as Jason. It can run as a server or a as a file format to kind of flow data in between pieces. This is really trying to solve common data formats for Pig, Hive, pretty much across the whole eco system. Avro is also designed for Hadoop in that it supports numbers, types of compressions at the block level or the record level that has headers at each of the files. Problems with some hierarchal data sets might be split across boundaries or other things so averse smart enough to not do that I should say.
If a Jason record was to overflow into the next data block and HDFS averse is smart enough to write it to another block and it will have a header on top of that. This is what my data looks like so when MapReduce runs that over it will read that header and just process all the data. Whereas having a header at the very top of the file every single task record we need to read the top of the file which is on some other system, a lot of optimizations in there for HDFS.
Another one is Parquet which I don’t know more about. It is a [inaudible 00:51:30] format, I believe they are looking at putting some HBases common data store and they are kind of looking at using [00:51:36] for storage format. You can look it up, I haven't played it yet. For scalable machine learning is a library called Mahout for scalable machine learning, algorithms in Java, for classifications, clustering pattern mining, collaborative filtering, a lot of other pieces in there that are implemented in this library.
It has a very robust example framework which is why a lot of people will just use it but there's like tones of parameters to configure for each piece. Another thing is it is really intended as a library for you to develop these machine learning applications. It just has a lot of cool examples. For work flow managements there is a project called Oozie which is designed for Hadoop jobs, running jobs for MapReduce, streaming MapReduce which we'll talk in a little bit, Pig, Hive and Scoop.
DCP is a special type of MapReduce job to basically copy one file from either Hadoop cluster to another Hadoop cluster or just copy it on parallel. It also runs any job over a shell script program and its all work flow system. It supports like job dependencies and order everything for you, cool too. For real time stream processing the project called Storm which is not a part of the Apache software finish. It was started by twitter. It's all about streams of data called a Spout which is signified by this Spout in this diagram.
It executes, there is also running a series of execution engines which are called Bolts which are just pieces of processing whatever that may be as you have defined it. It is a very scalable in fault tolerant with basically guaranteed processing of data. Each one of these Spouts creates an event whether that be a tweet or a piece from a log file and you can define your topology from this Spout I want to go to this Bolt which goes to this Bolt or I want to fork out to these other Bolts.
You can string together series of operations that say update your red assistance with the latest hush tag counts. They are using this to drive on twitter what's currently trending. This is from their live stream; they get pretty much instantaneous feedback from the streaming data. It’s been benchmarked over a million topples processed per second per node, so extremely, extremely fast, real time processing system.
For distributed application coordination you have ZooKeeper which the tag line is in an effort to develop and maintain an open source server to enables highly reliable distributed coordination. It handles all the things that you need to deal with in kind of like the distributed application development such as how do I do group leader elections, group services, how I manage configurations for all these applications? You can g through ZooKeeper. It looks kind of like a hierarchal file system where there is a bunch of like Z nodes. Here we have two apps and other pieces.
Each Z node acts like a folder and a file holding about a megabyte of data that is a hard arbitrary limit that they have. It is really intended for [inaudible 00:55:01] data and the applications can kind of watch these nodes. Say I'm watching app_1 if any other client changes that data on that node I will receive a notification of it. This is really good for configuration that if these were all configuration parameters and I as just the administrator wants to update the configuration for all of my distributed applications rather than modifying an X and L file and pushing it out and having to refresh and do a lot of management.
I can go to ZooKeeper and just say change all my applications obviously that new operator and we are good to go. HBase and [inaudible 00:55:35] both use this for their configuration management as well as kind of knowing where regions are inside the system. If you have Hbase you have ZooKeeper, yeah. Kind of the quick architecture piece of it, usually five ZooKeepers servers is enough for hundreds of nodes to manage. There is a leader server that's elected upon start-up and a number of clients.
You can connect to any server. It does synchronize rights to all the other ZooKeepers so these are all sharing the same name space. Clients can read data from any server however all the rights need to go to the reader. Whenever I want to change anything I go to the reader at which point the reader will propagate that change down to the other ZooKeepers servers. The Hadoop streaming library is a really quick way to write mappers and reducers and other pieces using standard in and standard out.
Any type of tool that supports standard in and standard out you can essentially write a mapper code for. You then execute on the command line, this is very similar in how you do it just for any normal job, you do Hadoop jar, you give it a jar file, here is the Hadoop streaming library so you can specify input, outputs, the map of class you have, this is the default mapper which you will see shortly and reduce. I'm just using slash bin, slash WC. The Hadoop streamlining will go out, it will use this mapper, that input, that output and we'll run this WC program over all of them.
I don't know if that's how you actually executed so don't copy paste it because it might not work, that's essentially the concept of it. You can write python scripts and pr scripts and a number of other pieces to execute MapReduce codes. It's not the best for production pieces, like it's a little wonky of using it but if you want to prototype something it's painful to like I've got to open up my Java and get a clip set up and like make a new project.
I just want to do some like simply python scripts and get it out here and see what's happening, really useful for that. The next piece is SQL on Hadoop, there are four that have kind of sprung up in the past couple of years one if which is Apache drill which is being spearheaded by map are Cloud and Impala written by Cloudera. Hive Stinger is an effort by Hortonworks to basically make the hive project 100 times faster and there are pivotal hawq which is our offering.
It is all massive parallel processing of sequel queries against HDFS data. The architectures are relatively similar but it doesn't run any MapReduce at this point. He hawq architecture you have a master server and a stand by master there. The master server response to any sequel queries, inserts, updates, anything like that, it handles all the passing, query optimizing pieces like that and it will dispatch single queries out to the segment servers.
The segment servers are running on the same host as the data nodes just like a task tracker our node manager is running there to process all these data. The data is being stored in HDFS. Hawq supports external tables I assume it's pointed at files in HDFS and it will read that data. It is much more performance if you store it in hawq native format because if you are familiar with the [inaudible 00:59:18] database hawq is extremely new. It is basically the [inaudible 00:59:22] database but it stored data in HDFS with a few other caveats there.
The query optimize is very advanced in that. When it goes by distributing data it can pick up optimal query plans but when you are just reading files it has no idea how that data is actually physically stored so it basically needs to read everything. That's the minute of hawq, yes?
Male: [Inaudible 00:59:47]
Adam: There is the [inaudible 00:59:59] database which is as is, that's not going anywhere. Hawq is architecturally similar and shares a lot of the same code with it. Hawq can only run on pivotal HD for a number of reasons some of which are technical. They are working on pushing back a feature that you have to add to HDFS in order for hawq to work to get its asset compliance transactions since HDFS is a pen only system. If the transaction fails you need to hold back that transaction and you can't do that in Apache HDFS.
They added that feature there for it to work. That's a lot of projects that you need to manage. I'm missing several, sorry if I didn't mention the people who are running the Apache Hadoop projects, I had my own I didn’t mention. Each one of these kind of cropped up to solve a general limitation of Hadoop. How do I get data into this file system, Flume Scoop? How do we take away all this boiler plate Java code, pagan five? How do I update my data into Hadoop? Can we have a common data format between all these pieces?
They've been developed over time, each one has a very specific piece, it's good to know your eco system and it is kind of why I wanted to all through this with you. You need to kind of pick the right tool for the job. I kind of whipped up a little sample architecture just to give you an idea of kind of how these all fits together. Say I have three data sources, my web server like sales on my website and like a call center that's coming through.
Three very different types of data sources and at the end there I want to have my website that has my user sitting there that wants to get some meaningful stuff out of it. I've got a couple left and that's all I need. What you'll first do is stand up Hadoop, you install HDFS, maybe I'm going to have a couple of Flume agents that are running here that are going to take my data from my web services and all my other systems and put them into HDFS.
Say I want to get some kind of real time metrics coming out of my sales data, as they are coming in I want to update my website on what people are really interested In right now, what are they looking at, the targets, are my consumers better. I'm going to put storm in there because that's my real time processing piece and I'm going to have this Flume agent, I'm going to write a flume agent directly into storm. I'm going to have a Flume synch which is essentially a storm spout.
That synch is going to write to this twitter storm spout. I'm going to also pipe this other data into HDFS and let's say I want to have storm, it's going to write out so I have a bolt to update my website with my latest metrics. I've got a bolt that's going to stuff this stuff into HBase for easier access later potentially by my call center. I built an application that can query data in HBase. This is kind of fast fetching piece that I'm talking about where a customer calls me, they have a problem with an order and I have to pull up that order data.
If it is just flat file sales data I can't get that without running a MapReduce job and that can take potentially minutes, that's not good. My application uses HBase to basically fetch that order number and bring it back to me in order to help out my customers faster. HBase is of course storing it in HDFS. Then I want to get some intelligence out of this so I will come up with a MapReduce job and Pig to look at the data that's in HBase. I can read HBase data; I can read HDFS data and whatever analytics you want to come up with here.
Of course I don't to run all these by hand so I'm going use Oozie to schedule my workflow and manage it and when you restart and you fail things a lot of good pieces for that management there. Then with sequel and Hadoop, people on the call center they might not know sequel, they might not know Java so they need to kind of talk sequel. I have the sequel and the Hadoop offering then I can read data directly from HDFS experienced to deep analytics to do other interesting things with it. I have some recent college graduate DBAs that they didn't learn Java which is very rare nowadays to teach Java in school.
That's another thing. They can also use the sequel interface into HDFS. You kind of have got to think where is my heading? Where is my solution and how do I plug in these different pieces of the Hadoop architecture to fit in there? It's a lot of things obviously to manage and each one has its own caveats and some are really good at things and some aren't and that's up to you all as the developers at the [inaudible 01:04:37] to figure out, sorry.
Male: [Inaudible 01:04:41].
Adam: Right, your question is mainly about data formats in here. Pivotal Hawq likes to store data in its own format but using PXF, the Pivotal Extension Framework it can virtually read anything that you can split like MapReduce. It can read line formatted data, it can read Avro, it can read sequence files which is a special Hadoop file, it can read … I built an extension for Jason, I built an extension for Cassandra table so like through the extension framework you can put basically a sequel on the face and on pretty much any data type that can be processed in parallel.
Some of that is more difficult than others so like you can do inserts in Hawq and it will store the data as is, like that can be another piece of using Flume or a batch processing and dumping stuff into the sequel and Hadoop offerings. They can power on other things, like they can read Avro and other formats. A lot of this, if you want to start like joining your web server data with your call center stuff, with your MapReduce input formats that I'll talk about next it's kind of how do I read my data?
Since I'm storing it unstructured as is I need to now read that data and that's defined in your code and how you're reading it. both how do I read it, is it like a block of bytes to form an X and L record, is it one line, is it an image that I'm reading in and also how to turn that into something tangible for processing which we'll see soon.
Male: [Inaudible 01:06:41]
Adam: I recommend, the tipping points really I have too much data to handle, like I can't do it in either. Some problems in that is they don't know that you don't have much data and you can get so much more. Like Facebook turned on their quick stream logging and they got way more data … they had a 10 terabyte oracle system for millions of dollars. They turned on the quick stream stuff and they were getting more data than they could load into their system. They went into this project, they were getting more data than they could handle, way more data than they could process.
By [inaudible 01:07:31] systems are pretty common but if you are kind of less than five it's not that much storage that probably the database would be a better fit. This type of technology is really for those that have a big data problem and that don't want to spend a lot in kind of scaling up your technology. If you expect to grow your system, grow your data size rather than adding and scaling up vertically on your hardware you can buy service and add them to you de cluster all the time and it's cheaper, people like that.
Male: [Inaudible 01:08:02]
Adam More than, I mean I don't have a good number for you, tens, dozens, terabytes. When you are talking terabytes this is definitely where you need to head towards, yeah.
Male: [Inaudible 01:08:26]
Adam: MapReduce paradigm, this is just MapReduce, I'm going to talk about kind of the head pieces of it. MapReduce is a data processing system with two key phases that are called map and reduced. You all would have never guessed that. The map performs its map function over input key value pairs to generate intermediate key value pairs. That piece of code, that piece of processing is up to you, whatever that is. Whether I'm doing a word count, whether I'm pulling stuff out of an image, whether I'm ripping images out of a PDF, whatever it is.
It has key input value pairs and output key value pairs. Then there is a reduced phase which performs a reduced function over intermediate key value groups to generate output key value pairs. The stuff that is output from the mapper is input into the reducer as groups. These groups are created by sorting the math output on key. Before with the Pig example I was saying group by word, the key is a word and then centered to value is a one.
When this sorting happens, when this grouping happens on the reduce side it's going to get a key which is a unique word in all the systems and so it was grouped on it and the list of values that was associated with that and we'll see that soon. This paradigm fits really well for Google because it extremely paralyzes the paralyzable algorithm. Since you are operating in key value pairs it doesn’t matter where these key value pairs came from.
I can really distribute this application to read data blocks, create key value pairs out of them and do some processing on them and then split this. They were using this to essentially index the internet which is sweet. For my example here, I'm just going to walk through word count. For map inputs they have three map tests and each one is going to process a single line of data. Hadoop the key for this one is the byte offset of that line which is typically useless and then the value is going to be that line of text itself.
This is the key pair I'm going to get for each call to my map function. That block has several lines in it so this mapper is going to get one map call per line because that is how I've defined how I'm going to read my data, how do I create key value pairs out of this? The map output of this in there, I'm going to take that line chunk it up into words and then output the word with the count of one. I saw this word one time and all of these tasks are happening in parallel over the system.
There is this middle phase here called Shuffle and Sort which I'll talk about soon that kind of goes over, how do I actually make these groups together? It is a group like key, here we have fun Hadoop [inaudible 01:11:18] and a list essentially of one's. This is the group that is created, this fun from here and this fun from here are going to go to this reduced tasks. There is no fun over here.
At that point then this Reduced task gets this group and it's going to go through that list and sum up all the numbers so I have an output word of fun and then number of two and then this output is going to be written back to HDFS. This is it basically; it is all key value pairs. Whatever you can fit into this framework is good. It's obviously very, very simplified in this and like Pig is running this behind scenes so they have their implementations on how they do distincts.
This is the boiler plate of … group byte is, I'm going to take whatever group byte set it as the key and then that record is going to be the value. The different MapReduce components in the map phase, these are all java classes that you can implement. There's an input format, a record reader, a mapper, a combiner and a partitioner. The input format is how do I split and specify my data, what format is it? Is it a text file, is it an [inaudible 01:12:32] file, is it a sequence file, am I reading from a Cassandra table?
The record reader is how do I take these bytes that I'm reading from my split and turning them into key value pairs? Is it a line, is it word by word, I'm I pulling in a chunk of data and I'm I reading an image from these bytes? This is where that kind of changing scheme of stuff really comes into play. If I can get some mem data about my schemer and have a sense that block is, the header is there on that block so you can read it. The mapper is a code, how I'm I doing my processing?
The combiner is a special component that I won't really go into. It's kind of like a mini reduce over that specific map output. The partitioner is how I'm I going to distribute my data to this reduced tasks? The reduced phase there is a shuffling which is the actual copying of the key valued pairs to those reducers that were partitioned, the sorting of the data, running the reducer for the groups and then of course you have the corresponding output format and a record writer. How I'm I going to write up these key value pairs? Is it one per line, my reading [inaudible 01:13:33] file, things like that.
The record writer is how will these bytes physically be written out or how are these key value pairs written? There are writable interfaces that are basically serialization for the data, they are writable that we can take from the data output and read fields which takes in data input as a writeable comparable which extends writable and comparable. These are kind of the key valued pair data types that exist. There are numbers out of the box Boolean bytes long and a lot of pieces for that writables.
You can write your own custom writables if you have your own special format, you can do that. Input format is n abstract class template to be those writable types for the key and value pair. There's one function for, get splits which we will take that input that you have and figure out what are the actual input splits of my job and it returns a list. These are those blocks locations that we were discussing earlier. There's also a function to create a record reader that is template to the same data types. The record reader has an initialized function to start this.
Typically for a data block you'd open the file, you would seek to the start of that data block which is going to be on that same note and that's pretty much the initialized piece. There is a next key value kind of like an [inaudible 01:14:53] to advance to the next key value returns a true or false value if it actually is data or not, return the key, return the value, get the progress over on the file used for the job tracker for metrics to kind of report back and finally close the record reader. In here you are taking that input split and you are like reading byte by byte or however you read in that line of text.
Mapper, this is a class that is often extended to do anything. There is a setup in the cleaner method to do any preliminary processing opening up reference data, loading it into memory and anything like that. The clean up to the close up anything, the map function, this is called the indentify mapper which just takes the input key and input value, converts it to the output data types and breaks them up. As well as a round function you can see setup while that record reader has values, get the key, get the value and pass them this context object.
Context is based on configuration used to write out output and things. You extend this class do any type of processing that you want. The partitioner is part of that reduced phase of where does this key value pair belong? What reduced task does it go to? It gives you the key, the value and the number of partitions which is essentially the number of reducers that you are going to have. If you have ten you are basically going to take this key value pair and make sure that the keys that are of the same value end up with the same reducer host.
The default is a hush partitioner which uses the key's hush code, [inaudible 01:16:25] number of partitions. You get a number from zero to nine in the sense 10, anything like that. Since that key hush code, the same object is supposed to have the same hush code so they'll end up at the same reducer. Now there's all these shuffling and sorting happen that is managed by the partitioner, the reducer copies everything over. Another class here, set up clean-up, reduce it gets this key and then alterable over all those values types.
This is going to be the word and then basically a bunch of numbers lists of one and run is pretty the same there. Output format, get record writer, check outputs specs which is just to make sure that you can run and in the sense of Hadoop it doesn't want to clobber its own output directory so it will check that the output directory does not already exist. There's also an output [inaudible 01:17:18] you don't really need to worry about, it's basically if the task fails it will clean up anything that it wrote out already.
The record writer very simple interface, write or abstract class I should say writer out the key value and close the file. Let's look at what this actually is in the word count sense, for some very basic MapReduce … yeah?
Male: These are all set up from the scratch [inaudible 01:17:49]
Adam: Your answer is no you cannot. All these pieces that I'm talking about are configured in that job manifest with your drop file. When the task tracker gets it, it looks at that configuration and then sues reflection to create these classes on the fly as amusement. The problem here, we want to count the number of times each word is used inside the body of text and we are going to use the text input format and text output format which are corresponding line record readers and line record writers.
A lot of pieces that were already written by Hadoop framework, by the Hadoop API there they have the default hush partitioner, lots of pieces like that. For a map we are going to get a byte offset and the line of text for each word in that line we are going to write out the word and the number of one. For reduce we are going to get a word and a list of those counts and we are just going to go through that and sum up all of them and output the word with the sum.
We need to write some upper code to do that, so this map is template to long writable text. Those are the input key values per data types and text in writable are output key value pair data types. For map we are going to get the long writable which is the byte offset and the text is the line. We take that line, run it through a string tokenizer utility to basically chop it up into words. While there are still more tokens there set our output key which is word to that word and context that right word with the number one and the one is just the incredible static object.
We take that line of text, we don't know where it came from, we don't know what host it's on and we don’t know anything about that. We are just operating on a line of text iterating through it and outputting each word with the number one. Next up is the shuffling sorts which in the interest of time I will breeze over. Each mapper writes out a single file that is logically partitioned by that partitioner. I have four reduced tasks, I get one from zero to there, zero, one, two, three, each piece of that.
Reducer copies each one of their pieces from all of these mappers, zero, zero, zero goes to Reduce zero. It sorts these partitions basically using a merge sort algorithm into one specific file and then that's read in by the reducer. Very complex thing, it goes into detail in Tom's White Hadoop guide if you really want to know about it. Taking a look at the Reduce code with that I'm going to get the key whish is going to be unique across the system and an alterable of these ones essentially.
For each one of these values we are going to get that int and add it to the sum, we are going to set it up with the value to that sum, output the key with that output value; that's where it count. You are now all Hadoop pros. What is so hard about that? The problem is all of these problems that you are going to have in working with MapReduce need to fit in this really tiny box called MapReduce. It's very much a limitation and that's kind of not like a feature I guess.
It's a really new way of thinking about this so it's simple processing operations that were used like joints. They are not really easy when expressing MapReduce. There are like four different types of joints that you need to take into account how much data I'm I processing, which type of joint do I want to use? Cross products are a whole other story. The kind of the proper implementation piece is not so easy to do that. There are a lot of configuration variables, implementation details. How many reduced tasks am I going to have? Is my data eschewed for these key groups that I'm creating?
Is one group going to have too much data to operate on and what about [inaudible 01:21:45], what about garbage collection? There are a lot of different things to come into play with that. Hadoop is primarily written in Java, all these components are extendable, configurable, customized pieces through input and output formats. To create custom formats you can read and write to external systems that you have and there are a lot of high level tools out there to enable rapid development, Pig, Hives, Sequel and Hadoop, the spring for Apache Hadoop framework also helps simplify this development effort.
Don't be totally scared away, it's not that bad. Hadoopapache.org is the website. It is an extremely supportive community. When I first got started I joined their email user list and just absorbed information like a sponge, it's what I do. Straiten Hadoop world October 28th through 30th will be in Manhattan this year. If you feel like going I'll be there and you can come say hi. There are a lot of resources about their blogs, email lists and books. If you want to know design patterns from MapReduce somebody named Adam Shook wrote a book on it, my friend Don Minor.
This book we kind of went through how do I implement distinct, how do I implement Jolene's? It is kind of taking the piece of how do we come and do this data processing analysis and express it in MapReduce? This wasn't anything new like we spurned out of our head. A lot of this information was out there already we just kind of collaborated it together. We do have a website mapreducepatterns.com that has nothing on it. If you want to bug me about it maybe I'll actually put stuff in there.
Our intent is kind of like open source the material in there a little bit. There is that book, as far as getting started ours concerned pivotal we have a single node JVM that you can download directly from our website that has pivot HD installed on it. It's consisting of HDFS, MapReduce, Pig, Hive, ZooKeeper, HBase, Hawq, Mahout, Flume, Scoop, pretty much everything that I have been talking about here is in that single node JVM for you to just start playing around with. We also have a community edition you can download.
It's our distribution of Hadoop that you can download and use on actual coasters if you feel like. For the brave and bold you can roll your own, if you feel like downloading stuff from Apache you can go check out the docs of Hadoop.apache.org and download everything there. Obligatory acknowledgement side, yeah. I would gladly give you my twitter handle, it's Adam J. Shook but I don't use it that much but I know people like twitter.
We have about four minutes or so on my clock if anybody has any question or want me to go over stuff, I know this was a huge amount of information to throw at you I hope you've gotten something out of it [inaudible 01:24:56] yes.
Male: [Inaudible 01:24:58]
Adam: I heard I guess yesterday that they have put pivot HD distribution on cloud funding, I don't know much of the detail behind it but they are working on that, they are working on putting an AWS also, yeah. Thank you.