How long will it take to sort 100TB of data? 100TB. Think about it for a minute. How many zeroes in 100TB? 15.
This task was started by Dr Jim Gray (it’s now known as GraySort). Till late last year, the fastest open source framework that was able to sort this data was a 2100 node Hadoop cluster by Yahoo! and it took 72 minutes to sort the data.
In late 2014, Apache Spark took 23 minutes to sort the same amount of data, by using only 206 nodes. Spark used the same MapReduce jobs that are run by Hadoop. Yet, it was 3 times faster while using up 10x less number of instances. A blog post by Databricks mentions a few reasons behind their performance. But, what is it about Spark that makes it so much faster?
Apache Spark is a general-purpose execution engine that enables both real-time and batch job based exploration of large datasets (Simba Technologies is the official supplier of the ODBC and JDBC spark drivers).
In 2004, the famous OSDI paper by Dean and Ghemawat introduced MapReduce programming model to the world. The paper also described the internals of Google File System (GFS). By 2006, Cutting and Cafferella had released an open source version of Google’s OSDI paper, christened “Hadoop”. Hadoop enabled reliable distributed storage of large datasets using Hadoop Distributed File System (HDFS) and enabled writing of MapReduce jobs. As part of Hadoop v2.0, we also have YARN (Yet Another Resource Negotiator), which uses a global Resource Manager and per-slave Node Managers to manage cluster resources and run jobs.
MapReduce jobs mean that you specify your batch jobs in terms of map and reduce jobs. A map job would go record by record and do the same task over all records. This would be followed by a “shuffle” operation which is the only time data is sent across the network. After this “shuffle”, your reduce job would read the map job outputs and do as specified. Consider two tasks, A and B. In task A, you have files that do not fit in memory and you want to count how many times you see the word “dimsum”. Several MapReduce jobs can be run, each of which can read part of the data and compute the count. At the end, those counts can be added up. This is made easier by the fact that data in HDFS is in the form of blocks, is replicated across the network (helping fault tolerance), and the computation is moved to the data. Task B is an optimization task. An optimization task would specify an objective to minimize/maximize, based on co-efficients who values determine the value of the objective. Let’s call those co-efficients (alpha, beta). You start with a given alpha and beta value, and you want to learn the best pair of (alpha, beta) that maximizes a given objective. How many MapReduce jobs will you need for the same?
The answer is not 1. The answer is several. Why? One MapReduce job will use one pair of (alpha, beta) and the output of that will be now stored on HDFS. That will be read by another MapReduce job, which will try another pair of (alpha, beta) and so on. It’s clear that MapReduce works great for batch jobs but not feasible for iterative jobs. Jobs where output of one is carried as input to the next.
What Spark is not
Spark is not a like-for-like replacement for Hadoop. As mentioned above, Spark is an execution engine, so, a substitute for the MapReduce programming model that is part of Hadoop. You can use Spark with any data source (even a bunch of text files), and it can run on top of cluster managers like YARN or Apache Mesos. Spark is an enabler that can help you run both batch and iterative jobs.
Spark on AWS
Let’s learn about Spark by example. We can setup a small Spark cluster on AWS and run a few queries to learn about some of the features. To setup a cluster on AWS, you need two things first:
0. An AWS account — go to aws.amazon.com and enter your credit card details. That’s it
1. Create a .pem file — a unique private key file that helps AWS verify your identity. You can only get this file once, so keep it safe! A link that tells you how to generate one.
2. A key pair that has (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY). Add this to your favourite shell profile file (e.g .bashrc)
After these steps are followed, you can generate your own spark cluster in a few minutes and get going! Read below after you have done the steps above.
3. Download spark from Spark website. You can see an ec2 directory there. Setup a small spark cluster by running the command:
1 <em>./spark-ec2 -k <name of pem file> -i <path to pem file> -s <number of slaves> -i <instance type> -r <region> launch <name of cluster></em>
1 <em>./spark-ec2 -k Simba -i Simba.pem -s 4 -i m2.xlarge -r us-east-1a launch simbaspark </em>
This would launch a cluster named “simbaspark” with 4 slaves, 1 master and all of them being m2.xlarge (by default, it chooses m2.xlarge). Note that the script expects dual core machines, so if you choose m1.small (single core), the workers wouldn’t start as it would expect a certain number of workers in total (number of slaves * 2). The region is chosen automatically, but some eastern regions do not have the instance type that is chosen by default. To workaround it, you can either choose to not specify either (in which case, it would lead to an error in the worst case) or choose both instance type and region.
After a few minutes, and while seeing several log statements, you would see that your cluster is setup. Yay! Now, you can login to your cluster and play with it.
1 <em>./spark-ec2 -k Simba -i Simba.pem login simbaspark</em>
What if I already have a Hadoop cluster and want to deploy spark on it?
If you are using Hadoop on top of YARN (or even Apache Mesos), you can run spark relatively easily. Even if you don’t use either, Spark can run in standalone mode. Spark runs a driver program which in turn invokes spark executors. This means that you need to tell Spark the nodes where you want your spark daemons to run (in terms of master/slave). In your spark/conf directory, you can see a file “slaves”. Update it to mention all the machines you want to use. You can setup spark from source or use a binary from the website.
NOTE: Please use Fully Qualified Domain Names (FQDN) for all your nodes, and make sure that each of those machines are passwordless ssh accessible from your master node.
After you have set up spark and updated the slaves file and other configuration options (spark-env.conf, spark-defaults.conf), do
1 <em>sh sbin/start-all.sh</em>
If you want to kill all services that are being run by spark, use spark/sbin/stop-all.sh and starting them again, use spark/sbin/start-all.sh
When you start pyspark, it would print several log statements but “>>>” won’t show up, just press enter and it would. That is not a bug, it is expected behaviour.)
This should start all the daemons. To make sure everything’s started, you can check the URL
Let’s get some data. I like food, so how about observing what people say about food. Download all finefoods reviews from Amazon from SNAP Stanford. It has about 500K reviews about various food items. Get the data on your AWS instance by doing
1 <em>wget <a class="markup--anchor markup--blockquote-anchor" href="http://snap.stanford.edu/data/finefoods.txt.gz" rel="nofollow">http://snap.stanford.edu/data/finefoods.txt.gz</a></em>
When using the EC2 script
Now, on your main login screen, you would see persistent-hdfs and ephemeral-hdfs. persistent-hdfs stores data permanently. ephemeral-hdfs, like the name suggests, stores data for the duration of a cluster session. Once you stop the cluster (stop != terminate), the data from hadoop is lost.
For the purposes of this demo, let’s put the data into ephemeral-hdfs and run some queries on it.
1 <em>ephemeral-hdfs/bin/hadoop -put finefoods.txt /root/finefoods.txt</em>
To make sure the data is there, you can do
1 <em>[root@ip-172–31–26–115 ~]# ephemeral-hdfs/bin/hadoop fs -ls /root</em>
1 <em>Found 1 items</em>
1 <em>-rw-r—r— 3 root supergroup 370796484 2015–02–11 00:46 /root/finefoods.txt</em>
1 <em>[root@ip-172–31–26–115 ~]# ephemeral-hdfs/bin/hadoop fs -cat /root/finefoods.txt | less</em>
1 <em>product/productId: B001E4KFG0</em>
1 <em>review/userId: A3SGXH7AUHU8GW</em>
1 <em>review/profileName: delmartian</em>
1 <em>review/helpfulness: 1/1</em>
1 <em>review/score: 5.0</em>
1 <em>review/time: 1303862400</em>
1 <em>review/summary: Good Quality Dog Food</em>
1 <em>review/text: I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than most.</em>
Before you can find answers to some questions, let’s start the python shell for spark.
You will see that the “>>>” prompt doesn’t show up sometimes. That’s okay! Just press enter and it would show up.
Let’s store the file as one RDD.
1 <em>>>>foodData = sc.textFile(“<a href="hdfs://root/finefoods.txt">hdfs://root/finefoods.txt</a>”)</em>
You get some log statements and it ends. Note that spark has lazy computation, so when you ran the above statement, foodData doesn’t actually exist. It has a RDD associated with it which knows how the data would be generated when asked for. To make sure we have the data, let’s find out the first five lines of foodData.
: java.lang.IllegalArgumentException: java.net.UnknownHostException: data
Oops, we get an error that starts by saying this. Oh! Maybe we should add another “/”? No, that’s not true. Well, we put the file on this machine, maybe it will automatically look up hdfs? That’s not true either.
You want to find out what is your default FS. Why? Well, the namenode knows which file exists where. So, if you ask the namenode, that can tell you?
Yes! Find your namenode’s config file, and then check for “fs.defaultFS”. And then do
<em>>>>foodData = sc.textFile(“<a href="hdfs://mynamenode.domain.hadoop:8020/root/finefoods.txt">hdfs://mynamenode.domain.hadoop:8020/root/finefoods.txt</a>”)</em>
[u’product/productId: B001E4KFG0′, u’review/userId: A3SGXH7AUHU8GW’, u’review/profileName: delmartian’, u’review/helpfulness: 1/1′, u’review/score: 5.0′]
And it works! So, that’s how you solve that hdfs problem.
So you go back to your :8080 URL and you try to see how long this job took and all you see is?
Zilch! You see no running or completed applications. What happened?! Did it not run at all? Is this pyspark just using the local node and not running the workers?
That’s not possible because when we started spark, it printed out the slave nodes where workers are running (and it’s also visible on the :8080 URL) and we know that the node where pyspark is running is just the master. So, what could have gone wrong?
Well the problem is that :8080 URL doesn’t have this information. This is part of :4040 (or :404[1–2]) URL. Go ahead, check your <ip-of-master-node>:404[0–2] URL and you should see something like
Now you see that there were two jobs run and they finished in so and so duration of time.
But, you noticed that there were stages as part of jobs and what if you want to find details of that?
If you see the “sbin” directory, there is a history server. This is not started, so go ahead and do
1 <em>sh sbin/start-history-server.sh</em>
And now if you go to <ip-of-master-node>:18080, you can find those details (note: when applications are finished, not just jobs. Applications -> jobs -> stages -> tasks).
Let’s find out answers to a few questions.
- How many reviews in total?
This will run a bunch of jobs and finally tell you that the count is 51106593.
You can cache this count so that the next time, spark wouldn’t have to run a read job over the complete data.
- How many review texts have caviar and foie gras in them?
To get this number, we should first get all the text into one RDD.
1 <em>justText = foodData.filter(lambda x:”review/text” in x).filter(lambda x:”caviar” in x or “foie gras” in x)</em>
This will first filter the data by running a function over each line (the “lambda” keyword does that for you) and choose those that have “review/text” in them. It will go through a second level of filtering, where all those texts you just chose, they will be read and those that have either “caviar” or “foie gras” in them will be chosen.
This is what spark calls a “transformation”. When we loaded foodData from a text file, spark created a RDD in memory. This is a distributed abstraction Spark uses. A resilient distributed dataset (RDD) can be thought of a set of objects in memory which spark keeps track of. To be fault tolerant, spark also keeps track of how this RDD was formed (what computation was used on which previous RDD or file) and this is what is called “lineage”. If a node goes down, the RDD that is lost can be easily regenerated by looking at it’s lineage.
And when you do multiple operations like above (filtering(filtering(foodData)), how many RDDs are created including intermediate ones?
How about this one?
1 <em>justText = sc.textFile(“<a href="hdfs://namenode:8020/filename.txt">hdfs://namenode:8020/filename.txt</a>).filter(lambda x:”review/text” in x).filter(lambda x:”caviar” in x or “foie gras” in x)</em>
Spark would see that justText needs filtering of filtering of loading a textFile, so it would set up a DAG (Directed Acyclic Graph) of jobs which would be run one by one. Also, no intermediate output will be written back and this would make it really fast. Finally, the justText RDD would only be computed when you do an “action” on it (e.g count how many elements are in it).
Where to go from here?
With this, you should be comfortable in creating a spark cluster whether you already have an existing hadoop cluster or from scratch and be able to use the shell. The next step would be to write spark applications and submit them and monitor them. With the right history server URL and the master URL, you are good to go! Have fun spark-ing