Hadoop

Working with Akka Actors


Overview

I am going to explain Akka actor model with a simple example fetching weather data from Yahoo, I am going to use akka scala API.

What is an Actor?

According to Webster dictionary an Actor is – one who acts – doer  or  one that takes part in any affair

What is Akka?

Akka is a toolkit and runtime for building highly concurrent,distributed, and resilient
message-driven applications on the JVM.

What is an Actor Model?

Akka has the concept of Actors, which provide an ideal model for thinking about highly concurrent and scalable systems.Actor model is a design pattern for writing concurrent and scalable code that runs on distributed systems.

Why Actor Model?

Multi-threaded programming runs multiple copies of your application code in their own threads and then synchronizing access to any shared objects. While it’s a complex issue, multi-threaded programming has three major faults:

  • Shared objects – protected by synchronized block or method but blocking
  • Deadlock – first thread tries to access synchronized block of second thread, while second thread tries to access synchronized block from first thread resulting in deadlock
  • Scalability – managing threads on multiple JVMs.

While Actor Model is not new and long exist in Haskell and Erlang. Akka run-time makes it fairly straightforward to write actor based applications.

Actor Model, Why Now?

Lot happened in last 20 years from mid-1990s to current date – not only its possible to scale up (at high-price) but we can scale out.

past_present.png

Actor model allows to write scalable software applications in distributed environment without pulling your hairs.

Inside Actor Model

 

actormodel (2)

Let’s understand how it works

  1. Actor System: is a glue that wires Actors, ActorRefs, Dispatchers, and Mailboxes together. Programmers use the ActorSystem to configure the desired implementation of each of these components. It creates and initializes actors.
  2. Next, actor creation process is initiated by sender (usually itself) which invokes actorOf() on ActorSystem creates reference of Actor object. At this time Actor may not be created/initiated. ActorRef acts as a proxy to Actor object and provides interface to communicate with Actor.
  3. Sendor Actor (Caller – in this case “actor itself from step 2”) uses  ! (bang operator known as tell message pattern – “fire-and-forget”, e.g. send a message asynchronously and return immediately.) to tell the receiving Actor about the event (Hi Actor, Can you please process this event?)
  4. ActorRef in response dispatches the event on MessageDispatcher (you can configure ActorSystem to use specific dispatcher by invoking withDispatcher)
  5. MessageDispatcher enqueues the event to Message Queue. .
    1. A MessageQueue is one of the core components in forming an Akka Mailbox.
    2. MessageQueue is where the normal messages that are sent to Actors will be enqueued (and subsequently dequeued)
    3. MessageQueue.enqueue() – enqueue the message to this queue, or throw an exception
  6. MessageDispatcher also looks for MailBox (By default every actor has a single mailbox – UnboundedMailbox). Mailbox holds the messages for receiving Actor. Once it finds the MailBox – MessageDispatcher binds a set of Actors to a thread pool (backed by BlockingQueue) and invokes MailBox.run() method
  7. MessageQueue.dequeue() – dequeue the next message from this queue, return null failing that.
  8. Eventually MailBox schedules the task on Actor which invokes receive() method on the Actor.

Example

Assuming you are working on Linux and have Scala + SBT configured and ready to use.

build.sbt 

name := "ScalaWorld"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.4"
libraryDependencies += "org.scala-lang.modules" % "scala-xml_2.11" % "1.0.5"

RunActors.scala

import akka.actor.{Actor, ActorSystem, Props}
import scala.io.Source
import scala.xml._

class WeatherActor extends Actor {
  override def receive: Receive = {
    case msg => println(msg);
  }
}

object RunActors{
  def main(args: Array[String]) {
    val system = ActorSystem("WeatherSystem")
    val weatherActor = system.actorOf(Props[WeatherActor], name = "weatheractor")
    val start = System.nanoTime
    for (id <- 4118 to  4128) {
      weatherActor !  getWeatherInfo(id.toString())
    }
    val end = System.nanoTime
    println("Time : " + (end - start) / 1000000000.0)
    // ActorSystem starts the ThreadPool, so we want to make sure we terminated the ActorSystem when we are done.
    system.shutdown()
  }

def getWeatherInfo(id: String) = {
    val url = "http://weather.yahooapis.com/forecastrss?w=" + id + "&u=c"
    val response = Source.fromURL(url).mkString
    val xmlResponse = XML.loadString(response)
    //Return tuple of 2 values (Tuple2).
    val returnVal = (xmlResponse \\ "location" \\ "@city",
      xmlResponse \\ "condition" \\ "@temp")
    returnVal
  }
}

SBT

cd into you project directory and type sbt

/home/hardik/Downloads/ScalaWorld> sbt
[info] Loading project definition from /home/hardik/Downloads/ScalaWorld/project
[info] Set current project to ScalaWorld (in build file:/home/hardik/Downloads/ScalaWorld/)
> run-main RunActors
[info] Updating {file:/home/hardik/Downloads/ScalaWorld}scalaworld…
[info] Resolving jline#jline;2.12.1 …
[info] Done updating.
[info] Compiling 1 Scala source to /home/hardik/Downloads/ScalaWorld/target/scala-2.11/classes…
[info] Running RunActors

Output:  I am querying Yahoo Weather WebService API and passing WOEID (Where on Earth ID) starting from 4118 to 4128 and fetching City and Temperature information. As you can see I have executed getWeatherInfo()  both with  and without Actors and you can see the execution time difference.

I only touched the surface of Akka Actors – there is a lot to cover which I will cover in next article.

Normal Execution (without Actor System invoking getWeaeherInfo(id.toString())
============================ 
(Toronto,8)
(Halton Hills,8)
(,)
(Milton,8)
(Oakville,8)
(,)
(Burlington,8)
(Hamilton,8)
(St. Catharines,8)
(Welland,8)
(Dunnville,8)
Time : 3.712243053
Execution With Actors
=============================
(Toronto,8)
(Halton Hills,8)
(,)
(Milton,8)
(Oakville,8)
(,)
(Burlington,8)
(Hamilton,8)
(St. Catharines,8)
(Welland,8)
(Dunnville,8)
Time : 1.359151664



Advertisements

Writing Custom Hive UDF and UDAF


In this article, I am going to walk over easy to follow examples and show how to create Hive UDF and UDAFs – package it in JAR and test in Hive CLI. So let’s begin.

In sqoop import article I imported customers table, similarly I have imported orders table – which I used in Hive Joins article. Also I am using dummy table for UDF verification. You can find relevant sqoop commands on Github

Hive supports of lot of in-built SQL-like functions in HiveQL. But, just in case if there is a need to write your own UDF, no one is stoping you.

UDF (User Defined Function)

Here I am going to show how to write a simple “trim-like” function called “Strip” – of course you can write something more fancier, but my goal here is to take away something in short amount of time. So, let’s begin.

How to write UDF function in Hive?

  1. Create Java class for User Defined Function which extends ora.apache.hadoop.hive.sq.exec.UDF amd implement more than one evaluate() methods and put your desisred logic and you are almost there.
  2. Package your Java class into JAR file (I am using maven)
  3. Go to Hive CLI – ADD your JAR, verify your JARs in Hive CLI classpath
  4. CREATE TEMPORARY FUNCTION in hive which points to your Java class
  5. Use it in Hive SQL and have fun!

There are better ways to do this by writing your own GenericUDF to deal with non-primitive types like Arrays, maps – but I am not going to cover it this article.

I will go into detail for each one.

Create Java class for User Defined Function 

As you can see below I am calling my Java class “Strip” you can call it anything – but important point it extends UDF function and provides two evalualte() implementations.

evaluate(Text str, String stripChars) - will trim specified characters in stripChars from first argument str.
evaluate(Text str) - will trim leading and trailing spaces.
package org.hardik.letsdobigdata;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class Strip extends UDF {

private Text result = new Text();
 public Text evaluate(Text str, String stripChars) {
 if(str == null) {
 return null;
 }
 result.set(StringUtils.strip(str.toString(), stripChars));
 return result;
 }
 public Text evaluate(Text str) {
 if(str == null) {
 return null;
 }
 result.set(StringUtils.strip(str.toString()));
 return result;
 }
}


Package your Java class into JAR

There is a pom.xml attached in Github, please make sure you have maven installed. If you are working with Github clone Go to shell

$ cd HiveUDFs

and run mvn clean package – this will create a JAR file which contains our UDF class. Copy the JAR path.

Go to Hive CLI and ADD UDF JAR 

hive> ADD /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;
Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path

Added resources: [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar]

Verify JAR is in Hive CLI Classpath

You should see your jar in the list.

hive> list jars;
/usr/lib/hive/lib/hive-contrib.jar
/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar

Create Temporary Function

It does not have to be temporary function, you can create your own function, but just to keep things moving, go ahead and create temporary function

You may want to add ADD JAR and CREATE TEMPORARY FUNCTION to .hiverc file and it will execute beginning of each hive session.

UDF Output

first query strips ‘ha’ from string ‘hadoop’ as expected (2 argument evaluate() in code)

second query strips trailing and leading spaces as expected.

hive> CREATE TEMPORARY FUNCTION STRIP AS 'org.hardik.letsdobigdata.Strip';
 hive> select strip('hadoop','ha') from dummy;
 OK
 doop
 Time taken: 0.131 seconds, Fetched: 1 row(s)
 hive> select strip('   hiveUDF ') from dummy;
 OK
 hiveUDF

If you have made this far, Congratulations that was our UDF in action! You can follow the code on github.

UDAF (User Defined Aggregated Function)

Now, equipped with our first UDF knowledge, we will move to a next step. When we say aggregation COUNT,AVG,SUM,MIN,MAX comes to our mind.

I am picking very simple aggregation function AVG/MEAN where I am going to work with “orders” table imported using sqoop, once you import it into hive it will look like below (or you can use LOAD DATA INPATH – totally upto you.)

hive> select * from orders;
 OK
 orders.order_id orders.order_date orders.customer_id orders.amount
 101 2016-01-01 7 3540
 102 2016-03-01 1 240
 103 2016-03-02 6 2340
 104 2016-02-12 3 5000
 105 2016-02-12 3 5500
 106 2016-02-14 9 3005
 107 2016-02-14 1 20
 108 2016-02-29 2 2000
 109 2016-02-29 3 2500
 110 2016-02-27 1 200

Goal of UDAF: Find average amount of orders for all customers in orders table

We are looking for Query: 
SELECT CUSTOMER_ID, AVG(AMOUNT) FROM ORDERS GROUP BY CUSTOMER_ID;

I am going to replace AVG function with “MEAN” function

But before I begin, let’s stop and think as we are leaving in MapReduce world. One of the bottleneck you want to avoid is moving too much data from Map to Reduce phase.

An aggregate function is more difficult to write than regular UDF. Values are aggregated in chunks (acrosss many maps or reducers), so the implementation has to be capable of combining partial aggregations into final results.

At a high-level, there are two parts to implementing a Generic UDAF

  1. evaluator – The evaluator class then actually implements the UDAF logic.
  2. resolver – handles type checking and operator overloading (if you want it), and helps Hive find the correct evaluator class for a given set of argument types

We are not creating GenericUDAF – we are creating our one-time-kind of aggregated functon so we do not have to worry about resolver – I am planning write on GenericUDF/GenericUDAF, though – may be some other day, but soon. 🙂

How to write UDAF?

  1. Create Java class which extends org.apache.hadoop.hive.ql.exec.hive.UDAF;
  2. Create Inner Class which implements UDAFEvaluator
  3. Implment five methods ()
    1. init() – The init() method initalizes the evaluator and resets its internal state. We are using new Column() in code below to indicate that no values have been aggregated yet.
    2. iterate() – this method is called everytime there is anew value to be aggregated. The evaulator should update its internal state with the result of performing the agrregation (we are doing sum – see below). We return true to indicate that input was valid.
    3. terminatePartial() – this method is called when Hive wants a result for the partial aggregation. The method must return an object that encapsulates the state of the aggregation.
    4. merge() – this method is called when Hive decides to combine one partial aggregation with another.
    5. terminate() – this method is called when the final result of the aggregation is needed.
  4. Compile and Package JAR
  5. CREATE TEMPORARY FUNCTION in hive CLI
  6. Run Aggregation Query – Verify Output!!!

MeanUDAF.java

package org.hardik.letsdobigdata;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.hardik.letsdobigdata.MeanUDAF.MeanUDAFEvaluator.Column;

@Description(name = "Mean", value = "_FUNC(double) - computes mean", extended = "select col1, MeanFunc(value) from table group by col1;")

public class MeanUDAF extends UDAF {

// Define Logging
static final Log LOG = LogFactory.getLog(MeanUDAF.class.getName());

public static class MeanUDAFEvaluator implements UDAFEvaluator {

/**
 * Use Column class to serialize intermediate computation
 * This is our groupByColumn
 */
public static class Column {
 double sum = 0;
 int count = 0;
 }

private Column col = null;

public MeanUDAFEvaluator() {
 super();
 init();
 }
// A - Initalize evaluator - indicating that no values have been
 // aggregated yet.

public void init() {
 LOG.debug("Initialize evaluator");
 col = new Column();
 }

// B- Iterate every time there is a new value to be aggregated
 public boolean iterate(double value) throws HiveException {
 LOG.debug("Iterating over each value for aggregation");
 if (col == null)
 throw new HiveException("Item is not initialized");
 col.sum = col.sum + value;
 col.count = col.count + 1;
 return true;
 }
// C - Called when Hive wants partially aggregated results.
 public Column terminatePartial() {
 LOG.debug("Return partially aggregated results");
 return col;
 }
 // D - Called when Hive decides to combine one partial aggregation with another
 public boolean merge(Column other) {
 LOG.debug("merging by combining partial aggregation");
 if(other == null) {
 return true;
 }
 col.sum += other.sum;
 col.count += other.count;
 return true; 
}
 // E - Called when the final result of the aggregation needed.
 public double terminate(){
 LOG.debug("At the end of last record of the group - returning final result"); 
 return col.sum/col.count;
 }
 
 }
}

udaf.png

Package and ADD JAR

hive> ADD JAR /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;
Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path
Added resources: [/home/cloudera/workspace/HiveUDFs/target/StudentCourseMRJob-0.0.1-SNAPSHOT.jar]

CREATE FUNCTION in HIVE

hive> CREATE TEMPORARY FUNCTION MeanFunc AS 'org.hardik.letsdobigdata.MeanUDAF';
OK

Verify Output

Execute below group by query , our function is called MeanFunc

 hive> select customer_id, MeanFunc(amount) from orders group by customer_id;
FAILED: SemanticException [Error 10001]: Line 1:42 Table not found 'orders'
hive> use sqoop_workspace;
OK
Time taken: 0.247 seconds
hive> select customer_id, MeanFunc(amount) from orders group by customer_id;
Query ID = cloudera_20160302030202_fb24b7c1-4227-4640-afb9-4ccd29bd735f
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1456782715090_0020, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1456782715090_0020/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1456782715090_0020
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2016-03-02 03:03:16,703 Stage-1 map = 0%,  reduce = 0%
2016-03-02 03:03:53,241 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 3.31 sec
2016-03-02 03:03:55,593 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.9 sec
2016-03-02 03:04:09,201 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.18 sec
MapReduce Total cumulative CPU time: 6 seconds 180 msec
Ended Job = job_1456782715090_0020
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 2  Reduce: 1   Cumulative CPU: 6.18 sec   HDFS Read: 12524 HDFS Write: 77 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 180 msec
OK
1    153.33333333333334
2    2000.0
3    4333.333333333333
6    2340.0
7    3540.0
9    3005.0
Time taken: 72.172 seconds, Fetched: 6 row(s)

Verify Individual customer_id : As you can see group by value matches – you can cross check manually – Thanks for your time and reading my blog – hope this is helpful!!!

hive> select * from orders where customer_id = 1;
OK
102    2016-03-01    1    240
107    2016-02-14    1    20
110    2016-02-27    1    200
Time taken: 0.32 seconds, Fetched: 3 row(s)
hive> select * from orders where customer_id = 2;
OK
108    2016-02-29    2    2000
Time taken: 0.191 seconds, Fetched: 1 row(s)
hive> select * from orders where customer_id = 3;
OK
104    2016-02-12    3    5000
105    2016-02-12    3    5500
109    2016-02-29    3    2500
Time taken: 0.093 seconds, Fetched: 3 row(s)

Hive Joins


Covering the basics of joins in hive.

joins

We will be working with two tables customer and orders that we imported in sqoop import article and going to perform following.

  1. INNER JOIN – Select records that have matching values in both tables.
  2. LEFT JOIN (LEFT OUTER JOIN) – returns all the values from the left table, plus the matched values from the right table, or NULL in case of no matching join predicate
  3. RIGHT JOIN (RIGHT OUTER JOIN) A RIGHT JOIN returns all the values from the right table, plus the matched values from the left table, or NULL in case of no matching join predicate
  4. FULL JOIN (FULL OUTER JOIN) – Selects all records that match either left or right table records.
  5. LEFT SEMI JOIN: Only returns the records from the left-hand table. Hive doesn’t support IN subqueries so you can’t do

SELECT * FROM TABLE_A WHERE TABLE_A.ID IN (SELECT ID FROM TABLE_B);

Customer Table

Hive Tip: to print column headers in command line

hive> set hive.cli.print.header=true;
hive> select * from customers;
 OK
 customers.id customers.name
 1 John
 2 Kevin
 19 Alex
 3 Mark
 4 Jenna
 5 Robert
 6 Zoya
 7 Sam
 8 George
 9 Peter

Orders Table:

hive> select * from orders;
 OK
 order_id orders.order_date orders.customer_id orders.amount
 101 2016-01-01 7 3540
 102 2016-03-01 1 240
 103 2016-03-02 6 2340
 104 2016-02-12 3 5000
 105 2016-02-12 3 5500
 106 2016-02-14 9 3005
 107 2016-02-14 1 20
 108 2016-02-29 2 2000
 109 2016-02-29 3 2500
 110 2016-02-27 1 200

 

INNER JOIN

Select records that have matching values in both tables.

hive> select c.id, c.name, o.order_date, o.amount from customers c inner join orders o ON (c.id = o.customer_id);
Output
c.id c.name o.order_date o.amount
 7 Sam 2016-01-01 3540
 1 John 2016-03-01 240
 6 Zoya 2016-03-02 2340
 3 Mark 2016-02-12 5000
 3 Mark 2016-02-12 5500
 9 Peter 2016-02-14 3005
 1 John 2016-02-14 20
 2 Kevin 2016-02-29 2000
 3 Mark 2016-02-29 2500
 1 John 2016-02-27 200

LEFT JOIN (LEFT OUTER JOIN)

Returns all the values from the left table, plus the matched values from the right table, or NULL in case of no matching join predicate

hive> select c.id, c.name, o.order_date, o.amount from customers c left outer join orders o ON (c.id = o.customer_id);
Output
 c.id    c.name    o.order_date    o.amount
 1    John    2016-03-01    240
 1    John    2016-02-14    20
 1    John    2016-02-27    200
 2    Kevin    2016-02-29    2000
 19    Alex    NULL    NULL
 3    Mark    2016-02-12    5000
 3    Mark    2016-02-12    5500
 3    Mark    2016-02-29    2500
 4    Jenna    NULL    NULL
 5    Robert    NULL    NULL
 6    Zoya    2016-03-02    2340
 7    Sam    2016-01-01    3540
 8    George    NULL    NULL
 9    Peter    2016-02-14    3005
 Time taken: 40.462 seconds, Fetched: 14 row(s)

RIGHT JOIN (RIGHT OUTER JOIN)

Returns all the values from the right table, plus the matched values from the left table, or NULL in case of no matching join predicate

hive> select c.id, c.name, o.order_date, o.amount from customers c left outer join orders o ON (c.id = o.customer_id);
Output
 c.id    c.name    o.order_date    o.amount
 7    Sam    2016-01-01    3540
 1    John    2016-03-01    240
 6    Zoya    2016-03-02    2340
 3    Mark    2016-02-12    5000
 3    Mark    2016-02-12    5500
 9    Peter    2016-02-14    3005
 1    John    2016-02-14    20
 2    Kevin    2016-02-29    2000
 3    Mark    2016-02-29    2500
 1    John    2016-02-27    200

FULL JOIN (FULL OUTER JOIN)

Selects all records that match either left or right table records.

hive> select c.id, c.name, o.order_date, o.amount from customers c full outer join orders o ON (c.id = o.customer_id);

Output

c.id c.name o.order_date o.amount
 1 John 2016-02-27 200
 1 John 2016-02-14 20
 1 John 2016-03-01 240
 19 Alex NULL NULL
 2 Kevin 2016-02-29 2000
 3 Mark 2016-02-29 2500
 3 Mark 2016-02-12 5500
 3 Mark 2016-02-12 5000
 4 Jenna NULL NULL
 5 Robert NULL NULL
 6 Zoya 2016-03-02 2340
 7 Sam 2016-01-01 3540
 8 George NULL NULL
 9 Peter 2016-02-14 3005

LEFT SEMI JOIN

Find all the customers where at least one order exist or find all customer who has placed an order.

left_semi_join

hive> select *  from customers  left semi join orders  ON 
(customers.id = orders.customer_id);

OUTPUT

customers.id    customers.name
1    John
2    Kevin
3    Mark
6    Zoya
7    Sam
9    Peter
Time taken: 56.362 seconds, Fetched: 6 row(s)

That’s it in this article. I hope you find this useful, thank you for reading.

Sqoop: Import Data from MySQL to Hive


sqoop drawing.png

Prerequisite: Hadoop Environment with Sqoop and Hive installed and working. To speed up the work, I am using Cloudera Quickstart VM  (requires 4GB of RAM) – although you can also work with Hortonworks Data Platform (requires 8GB of RAM) – since my laptop has only 8GB of RAM – I prefer to work with Cloudera VM image

If you are working with Cloudera/HDP VM and its all fired up in Virtualbox – it becomes easier to work with many of hadoop ecoystem packages comes pre-installed (mysql, oozie, hadoop,hive,zookeeper, storm, kafka, spark etc…)

Create table in mysql

In cloudera VM, open command prompt and just makesure mysql is installed. For me it’s

shell> mysql --version
mysql  Ver 14.14 Distrib 5.1.66, for redhat-linux-gnu (x86_64) using readline 5.

You should always work in your own database, so create database in mysql using

mysql> create database sqoop;

then,

mysql> use sqoop;
mysql> create table customer(id varchar(3), name varchar(20), age varchar(3), salary integer(10));
Query OK, 0 rows affected (0.09 sec)

mysql> desc customer;
+--------+-------------+------+-----+---------+-------+
| Field  | Type        | Null | Key | Default | Extra |
+--------+-------------+------+-----+---------+-------+
| id     | varchar(3)  | YES  |     | NULL    |       |
| name   | varchar(20) | YES  |     | NULL    |       |
| age    | varchar(3)  | YES  |     | NULL    |       |
| salary | int(10)     | YES  |     | NULL    |       |
+--------+-------------+------+-----+---------+-------+

 

mysql> select * from customer;
+------+--------+------+--------+
| id   | name   | age  | salary |
+------+--------+------+--------+
| 1    | John   | 30   |  80000 |
| 2    | Kevin  | 33   |  84000 |
| 3    | Mark   | 28   |  90000 |
| 4    | Jenna  | 34   |  93000 |
| 5    | Robert | 32   | 100000 |
| 6    | Zoya   | 40   |  60000 |
| 7    | Sam    | 37   |  75000 |
| 8    | George | 31   |  67000 |
| 9    | Peter  | 23   |  70000 |
| 19   | Alex   | 26   |  74000 |
+------+--------+------+-----

Let’s start Sqooping

As you can see customer table does not have any primary key. I have added few records in customet table. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range.

If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by argument. For example, --split-by id.

Since I want to import this table directly into hive I am adding –hive-import to my sqoop command.

sqoop import --connect jdbc:mysql://localhost:3306/sqoop 
--username root 
-P 
--split-by id 
--columns id,name 
--table customer  
--target-dir /user/cloudera/ingest/raw/customers 
--fields-terminated-by "," 
--hive-import 
--create-hive-table 
--hive-table sqoop_workspace.customers

here’s what individual sqoop command option means

  • connect – provide jdbc string
  • username – database username
  • -P  – will ask the password in console – alternatively you can use –password but this is not a good practice as its visible in your job execution logs and asking for trouble. One way to deal with this is store db password in a file in HDFS and provide at runtime.
  • table – tell which table you want to import from mysql – here’s its customer
  • split-by – specify whats your splitting column – I am specifying id here.
  • target-dir – HDFS destination dir
  • fields-terminated-by – I have specified comma (as by default it will import data into HDFS with comma separated values)
  • hive-import – Import table into hive (Uses Hive’s default delimiters if none are set.)
  • create-hive-table – If set job will fail if hive table already exist – it works in this case.
  • hive-table – specify <db_name>.<table_name> here its sqoop_workspace.customers where sqoop_workspace is my database and customers is the table name.

As you can see below, sqoop is a map-reduce job. Notice that I am using -P for password option.  While this works, but can be easliy parameterized by using –password  and reading it from file.

sqoop import --connect jdbc:mysql://localhost:3306/sqoop --username root -P --split-by id --columns id,name --table customer  --target-dir /user/cloudera/ingest/raw/customers --fields-terminated-by "," --hive-import --create-hive-table --hive-table sqoop_workspace.customers
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
16/03/01 12:59:44 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.5.0
Enter password: 
16/03/01 12:59:54 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
16/03/01 12:59:54 INFO tool.CodeGenTool: Beginning code generation
16/03/01 12:59:55 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customer` AS t LIMIT 1
16/03/01 12:59:56 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customer` AS t LIMIT 1
16/03/01 12:59:56 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/6471c43b5c867834458d3bf5a67eade2/customer.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
16/03/01 13:00:01 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/6471c43b5c867834458d3bf5a67eade2/customer.jar
16/03/01 13:00:01 WARN manager.MySQLManager: It looks like you are importing from mysql.
16/03/01 13:00:01 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
16/03/01 13:00:01 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
16/03/01 13:00:01 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
16/03/01 13:00:01 INFO mapreduce.ImportJobBase: Beginning import of customer
16/03/01 13:00:01 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
16/03/01 13:00:02 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
16/03/01 13:00:04 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/03/01 13:00:05 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/03/01 13:00:11 INFO db.DBInputFormat: Using read commited transaction isolation
16/03/01 13:00:11 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `customer`
16/03/01 13:00:11 WARN db.TextSplitter: Generating splits for a textual index column.
16/03/01 13:00:11 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
16/03/01 13:00:11 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
16/03/01 13:00:11 INFO mapreduce.JobSubmitter: number of splits:4
16/03/01 13:00:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1456782715090_0004
16/03/01 13:00:13 INFO impl.YarnClientImpl: Submitted application application_1456782715090_0004
16/03/01 13:00:13 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1456782715090_0004/
16/03/01 13:00:13 INFO mapreduce.Job: Running job: job_1456782715090_0004
16/03/01 13:00:47 INFO mapreduce.Job: Job job_1456782715090_0004 running in uber mode : false
16/03/01 13:00:48 INFO mapreduce.Job:  map 0% reduce 0%
16/03/01 13:01:43 INFO mapreduce.Job:  map 25% reduce 0%
16/03/01 13:01:46 INFO mapreduce.Job:  map 50% reduce 0%
16/03/01 13:01:48 INFO mapreduce.Job:  map 100% reduce 0%
16/03/01 13:01:48 INFO mapreduce.Job: Job job_1456782715090_0004 completed successfully
16/03/01 13:01:48 INFO mapreduce.Job: Counters: 30
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=548096
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=409
        HDFS: Number of bytes written=77
        HDFS: Number of read operations=16
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=8
    Job Counters 
        Launched map tasks=4
        Other local map tasks=5
        Total time spent by all maps in occupied slots (ms)=216810
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=216810
        Total vcore-seconds taken by all map tasks=216810
        Total megabyte-seconds taken by all map tasks=222013440
    Map-Reduce Framework
        Map input records=10
        Map output records=10
        Input split bytes=409
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=2400
        CPU time spent (ms)=5200
        Physical memory (bytes) snapshot=418557952
        Virtual memory (bytes) snapshot=6027804672
        Total committed heap usage (bytes)=243007488
    File Input Format Counters 
        Bytes Read=0
    File Output Format Counters 
        Bytes Written=77
16/03/01 13:01:48 INFO mapreduce.ImportJobBase: Transferred 77 bytes in 104.1093 seconds (0.7396 bytes/sec)
16/03/01 13:01:48 INFO mapreduce.ImportJobBase: Retrieved 10 records.
16/03/01 13:01:49 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customer` AS t LIMIT 1
16/03/01 13:01:49 INFO hive.HiveImport: Loading uploaded data into Hive

Logging initialized using configuration in jar:file:/usr/jars/hive-common-1.1.0-cdh5.5.0.jar!/hive-log4j.properties
OK
Time taken: 2.163 seconds
Loading data to table sqoop_workspace.customers
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00000': User does not belong to supergroup
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00001': User does not belong to supergroup
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00002': User does not belong to supergroup
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00003': User does not belong to supergroup
Table sqoop_workspace.customers stats: [numFiles=4, totalSize=77]
OK
Time taken: 1.399 seconds

Finally, let’s  verify the output in hive.

Finally, verify output in Hive:

hive> show databases;
OK
default
sqoop_workspace
Time taken: 0.034 seconds, Fetched: 2 row(s)
hive> use sqoop_workspace;
OK
Time taken: 0.063 seconds
hive> show tables;
OK
customers
Time taken: 0.036 seconds, Fetched: 1 row(s)
hive> show create table customers;
OK
CREATE TABLE `customers`(
  `id` string, 
  `name` string)
COMMENT 'Imported by sqoop on 2016/03/01 13:01:49'
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
  LINES TERMINATED BY '\n' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers'
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='true', 
  'numFiles'='4', 
  'totalSize'='77', 
  'transient_lastDdlTime'='1456866115')
Time taken: 0.26 seconds, Fetched: 18 row(s)

hive> select * from customers;
OK
1    John
2    Kevin
19    Alex
3    Mark
4    Jenna
5    Robert
6    Zoya
7    Sam
8    George
9    Peter
Time taken: 1.123 seconds, Fetched: 10 row(s).

That’s it for now. Hope you find it useful, thanks for your support and reading my blog.

Setting up Hadoop multi-node cluster on Amazon EC2 Part 2


In Part-1 we have successfully created, launched and connected to Amazon Ubuntu Instances. In Part-2 I will show how to install and setup Hadoop cluster. If you are seeing this page first time, I would strongly advise you to go over Part-1.

In this article

  • HadoopNameNode will be referred as master,
  • HadoopSecondaryNameNode will be referred as SecondaryNameNode or SNN
  • HadoopSlave1 and HadoopSlave2 will be referred as slaves (where data nodes will reside)

So, let’s begin.

1. Apache Hadoop Installation and Cluster Setup

1.1 Update the packages and dependencies.

Let’s update the packages , I will start with master , repeat this for SNN and 2 slaves.

$ sudo apt-get update

Once its complete, let’s install java

1.2 Install Java

Add following PPA and install the latest Oracle Java (JDK) 7 in Ubuntu

$ sudo add-apt-repository ppa:webupd8team/java

$ sudo apt-get update && sudo apt-get install oracle-jdk7-installer

Check if Ubuntu uses JDK 7

java_installaationRepeat this for SNN and 2 slaves.

 1.3 Download Hadoop

I am going to use haddop 1.2.1 stable version from apache download page and here is the 1.2.1 mirror

issue wget command from shell

$ wget http://apache.mirror.gtcomm.net/hadoop/common/hadoop-1.2.1/hadoop-1.2.1.tar.gz

download_hadoop

Unzip the files and review the package content and configuration files.

$ tar -xzvf hadoop-1.2.1.tar.gz

dir_listing

For simplicity, rename the ‘hadoop-1.2.1’ directory to ‘hadoop’ for ease of operation and maintenance.

$ mv hadoop-1.2.1 hadoop

dir_listing_2

1.4 Setup Environment Variable

Setup Environment Variable for ‘ubuntu’ user

Update the .bashrc file to add important Hadoop paths and directories.

Navigate to home directory

$cd

Open .bashrc file in vi edito

$ vi .bashrc

Add following at the end of file

export HADOOP_CONF=/home/ubuntu/hadoop/conf

export HADOOP_PREFIX=/home/ubuntu/hadoop

#Set JAVA_HOME

export JAVA_HOME=/usr/lib/jvm/java-7-oracle

# Add Hadoop bin/ directory to path
export PATH=$PATH:$HADOOP_PREFIX/bin

Save and Exit.

To check whether its been updated correctly or not, reload bash profile, use following commands

source ~/.bashrc
echo $HADOOP_PREFIX
echo $HADOOP_CONF
Repeat 1.3 and 1.4  for remaining 3 machines (SNN and 2 slaves).

1.5 Setup Password-less SSH on Servers

Master server remotely starts services on salve nodes, whichrequires password-less access to Slave Servers. AWS Ubuntu server comes with pre-installed OpenSSh server.
Quick Note:
The public part of the key loaded into the agent must be put on the target system in ~/.ssh/authorized_keys. This has been taken care of by the AWS Server creation process
Now we need to add the AWS EC2 Key Pair identity haddopec2cluster.pem to SSH profile. In order to do that we will need to use following ssh utilities
  • ‘ssh-agent’ is a background program that handles passwords for SSH private keys.
  •  ‘ssh-add’ command prompts the user for a private key password and adds it to the list maintained by ssh-agent. Once you add a password to ssh-agent, you will not be asked to provide the key when using SSH or SCP to connect to hosts with your public key.

Amazon EC2 Instance  has already taken care of ‘authorized_keys’ on master server, execute following commands to allow password-less SSH access to slave servers.

First of all we need to protect our keypair files, if the file permissions are too open (see below) you will get an error

ssh_error

To fix this problem, we need to issue following commands

$ chmod 644 authorized_keys

Quick Tip: If you set the permissions to ‘chmod 644’, you get a file that can be written by you, but can only be read by the rest of the world.

$ chmod 400 haddoec2cluster.pem

Quick Tip: chmod 400 is a very restrictive setting giving only the file onwer read-only access. No write / execute capabilities for the owner, and no permissions what-so-ever for anyone else.

To use ssh-agent and ssh-add, follow the steps below:

  1. At the Unix prompt, enter: eval `ssh-agent`Note: Make sure you use the backquote ( ` ), located under the tilde ( ~ ), rather than the single quote ( ' ).
  2. Enter the command: ssh-add hadoopec2cluster.pem

if you notice .pem file has “read-only” permission now and this time it works for us.

ssh_success
Keep in mind ssh session will be lost upon shell exit and you have repeat ssh-agent and ssh-add commands.

Remote SSH

Let’s verify that we can connect into SNN and slave nodes from master

ssh_remote_connection

$ ssh ubuntu@<your-amazon-ec2-public URL>
On successful login the IP address on the shell will change.

1.6 Hadoop Cluster Setup

This section will cover the hadoop cluster configuration.  We will have to modify

  • hadoop-env.sh – This file contains some environment variable settings used by Hadoop. You can use these to affect some aspects of Hadoop daemon behavior, such as where log files are stored, the maximum amount of heap used etc. The only variable you should need to change at this point is in this file is JAVA_HOME, which specifies the path to the Java 1.7.x installation used by Hadoop.
  • core-site.xml –  key property fs.default.name – for namenode configuration for e.g hdfs://namenode/
  • hdfs-site.xml – key property – dfs.replication – by default 3
  • mapred-site.xml  – key property  mapred.job.tracker for jobtracker configuration for e.g jobtracker:8021

We will first start with master (NameNode) and then copy above xml changes to remaining 3 nodes (SNN and slaves)

Finally, in section 1.6.2 we will have to configure conf/masters and conf/slaves.

  • masters – defines on which machines Hadoop will start secondary NameNodes in our multi-node cluster.
  • slaves –  defines the lists of hosts, one per line, where the Hadoop slave daemons (datanodes and tasktrackers) will run.

Lets go over one by one. Start with masters (namenode).

hadoop-env.sh

$ vi $HADOOP_CONF/hadoop-env.sh  and add JAVA_HOME shown below and save changes.

hadoop_env.sh

core-site.xml

This file contains configuration settings for Hadoop Core (for e.g I/O) that are common to HDFS and MapReduce Default file system configuration property – fs.default.name  goes here it could for e.g hdfs / s3 which will be used by clients.

$ vi $HADOOP_CONF/core-site.xml

We are going t0 add two properties

  • fs.default.name  will point to NameNode URL and port (usually 8020)
  • hadoop.tmp.dir  – A base for other temporary directories. Its important to note that every node needs hadoop tmp directory.  I am going to create a new directory “hdfstmp”  as below in all 4 nodes. Ideally you can write a shell script to do this for you, but for now going the manual way.

$ cd

$ mkdir hdfstmp

Quick Tip:  Some of the important directories are dfs.name.dir, dfs.data.dir in hdfs-site.xml. The default value for the dfs.name.dir is ${hadoop.tmp.dir}/dfs/data and dfs.data.dir is${hadoop.tmp.dir}/dfs/data. It is critical that you choose your directory location wisely in production environment.

<configuration>

<property>
<name>fs.default.name</name>
<value>hdfs://ec2-54-209-221-112.compute-1.amazonaws.com:8020</value>
</property>

<property>
<name>hadoop.tmp.dir</name>
<value>/home/ubuntu/hdfstmp</value>
</property>

</configuration>

hdfs-site.xml

This file contains the configuration for HDFS daemons, the NameNode, SecondaryNameNode  and data nodes.

We are going to add 2 properties

  • dfs.permissions.enabled  with value false,  This means that any user, not just the “hdfs” user, can do anything they want to HDFS so do not do this in production unless you have a very good reason. if “true”, enable permission checking in HDFS. If “false”, permission checking is turned off, but all other behavior is unchanged. Switching from one parameter value to the other does not change the mode, owner or group of files or directories. Be very careful before you set this
  • dfs.replication  – Default block replication is 3. The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time. Since we have 2 slave nodes we will set this value to 2.

<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>

hdfs-site

mapred-site.xml

This file contains the configuration settings for MapReduce daemons; the job tracker and the task-trackers.
The mapred.job.tracker parameter is a hostname (or IP address) and port pair on which the Job Tracker listens for RPC communication. This parameter specify the location of the Job Tracker for Task Trackers and MapReduce clients.

JobTracker will be running on master (NameNode)

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>hdfs://ec2-54-209-221-112.compute-1.amazonaws.com:8021</value>
</property>
</configuration>

1.6.1 Move configuration files to Slaves

Now, we are done with hadoop xml files configuration master, lets copy the files to remaining 3 nodes using secure copy (scp)

start with SNN, if you are starting a new session, follow ssh-add as per section 1.5

from master’s unix shell issue below command

$ scp hadoop-env.sh core-site.xml hdfs-site.xml mapred-site.xml ubuntu@ec2-54-209-221-47.compute-1.amazonaws.com:/home/ubuntu/hadoop/conf

repeat this for slave nodes

scp_configurations

1.6.2 Configure Master and Slaves

Every hadoop distribution comes with master and slaves files. By default it contains one entry for localhost, we have to modify these 2 files on both “masters” (HadoopNameNode) and “slaves” (HadoopSlave1 and HadoopSlave2) machines – we have a dedicated machine for HadoopSecondaryNamdeNode.

masters_slaves

slaves_file

1.6.3 Modify masters file on Master machine

conf/masters file defines on which machines Hadoop will start Secondary NameNodes in our multi-node cluster. In our case, there will be two machines HadoopNameNode and HadoopSecondaryNameNode

Hadoop HDFS user guide : “The secondary NameNode merges the fsimage and the edits log files periodically and keeps edits log size within a limit. It is usually run on a different machine than the primary NameNode since its memory requirements are on the same order as the primary NameNode. The secondary NameNode is started by “bin/start-dfs.sh“ on the nodes specified in “conf/masters“ file.

$ vi $HADOOP_CONF/masters and provide an entry for the hostename where you want to run SecondaryNameNode daemon. In our case HadoopNameNode and HadoopSecondaryNameNode

m1

1.6.4 Modify the slaves file on master machine

The slaves file is used for starting DataNodes and TaskTrackers

$ vi $HADOOP_CONF/slaves

slaves_config

1.6.5 Copy masters and slaves to SecondaryNameNode

Since SecondayNameNode configuration will be same as NameNode, we need to copy master and slaves to HadoopSecondaryNameNode.

copy_master_slaves

1.6.7 Configure master and slaves on “Slaves” node

Since we are configuring slaves (HadoopSlave1 & HadoopSlave2) , masters file on slave machine is going to be empty

$ vi $HADOOP_CONF/masters

master_file_on_slaves

Next, update the ‘slaves’ file on Slave server (HadoopSlave1) with the IP address of the slave node. Notice that the ‘slaves’ file at Slave node contains only its own IP address and not of any other Data Node in the cluster.

$ vi $HADOOP_CONF/slaves

slaves_file_on_slave

Similarly update masters and slaves for HadoopSlave2

1.7 Hadoop Daemon Startup

The first step to starting up your Hadoop installation is formatting the Hadoop filesystem which runs on top of your , which is implemented on top of the local filesystems of your cluster. You need to do this the first time you set up a Hadoop installation. Do not format a running Hadoop filesystem, this will cause all your data to be erased.

To format the namenode

$ hadoop namenode -format

namenode_format

Lets start all hadoop daemons from HadoopNameNode

$ cd $HADOOP_CONF

$ start-all.sh

This will start

  • NameNode,JobTracker and SecondaryNameNode daemons on HadoopNameNode

strat-all

  • SecondaryNameNode daemons on HadoopSecondaryNameNode

snn

  • and DataNode and TaskTracker daemons on slave nodes HadoopSlave1 and HadoopSlave2

Screen shot 2014-01-13 at 9.59.38 AM

Screen shot 2014-01-13 at 9.59.56 AM

We can check the namenode status from http://ec2-54-209-221-112.compute-1.amazonaws.com:50070/dfshealth.jsp

namenode_stattus

Check Jobtracker status : http://ec2-54-209-221-112.compute-1.amazonaws.com:50030/jobtracker.jsp

jobtracker_status

Slave Node Status for HadoopSlave1 : http://ec2-54-209-223-7.compute-1.amazonaws.com:50060/tasktracker.jsp

tasktracker1_status

Slave Node Status for HadoopSlave2 : http://ec2-54-209-219-2.compute-1.amazonaws.com:50060/tasktracker.jsp

tasktracker2_status

To quickly verify our setup, run the hadoop pi example

ubuntu@ec2-54-209-221-112:~/hadoop$ hadoop jar hadoop-examples-1.2.1.jar pi 10 1000000

Number of Maps  = 10
Samples per Map = 1000000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
14/01/13 15:44:12 INFO mapred.FileInputFormat: Total input paths to process : 10
14/01/13 15:44:13 INFO mapred.JobClient: Running job: job_201401131425_0001
14/01/13 15:44:14 INFO mapred.JobClient:  map 0% reduce 0%
14/01/13 15:44:32 INFO mapred.JobClient:  map 20% reduce 0%
14/01/13 15:44:33 INFO mapred.JobClient:  map 40% reduce 0%
14/01/13 15:44:46 INFO mapred.JobClient:  map 60% reduce 0%
14/01/13 15:44:56 INFO mapred.JobClient:  map 80% reduce 0%
14/01/13 15:44:58 INFO mapred.JobClient:  map 100% reduce 20%
14/01/13 15:45:03 INFO mapred.JobClient:  map 100% reduce 33%
14/01/13 15:45:06 INFO mapred.JobClient:  map 100% reduce 100%
14/01/13 15:45:09 INFO mapred.JobClient: Job complete: job_201401131425_0001
14/01/13 15:45:09 INFO mapred.JobClient: Counters: 30
14/01/13 15:45:09 INFO mapred.JobClient:   Job Counters
14/01/13 15:45:09 INFO mapred.JobClient:     Launched reduce tasks=1
14/01/13 15:45:09 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=145601
14/01/13 15:45:09 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/01/13 15:45:09 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/01/13 15:45:09 INFO mapred.JobClient:     Launched map tasks=10
14/01/13 15:45:09 INFO mapred.JobClient:     Data-local map tasks=10
14/01/13 15:45:09 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=33926
14/01/13 15:45:09 INFO mapred.JobClient:   File Input Format Counters
14/01/13 15:45:09 INFO mapred.JobClient:     Bytes Read=1180
14/01/13 15:45:09 INFO mapred.JobClient:   File Output Format Counters
14/01/13 15:45:09 INFO mapred.JobClient:     Bytes Written=97
14/01/13 15:45:09 INFO mapred.JobClient:   FileSystemCounters
14/01/13 15:45:09 INFO mapred.JobClient:     FILE_BYTES_READ=226
14/01/13 15:45:09 INFO mapred.JobClient:     HDFS_BYTES_READ=2740
14/01/13 15:45:09 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=622606
14/01/13 15:45:09 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=215
14/01/13 15:45:09 INFO mapred.JobClient:   Map-Reduce Framework
14/01/13 15:45:09 INFO mapred.JobClient:     Map output materialized bytes=280
14/01/13 15:45:09 INFO mapred.JobClient:     Map input records=10
14/01/13 15:45:09 INFO mapred.JobClient:     Reduce shuffle bytes=280
14/01/13 15:45:09 INFO mapred.JobClient:     Spilled Records=40
14/01/13 15:45:09 INFO mapred.JobClient:     Map output bytes=180
14/01/13 15:45:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=2039111680
14/01/13 15:45:09 INFO mapred.JobClient:     CPU time spent (ms)=9110
14/01/13 15:45:09 INFO mapred.JobClient:     Map input bytes=240
14/01/13 15:45:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=1560
14/01/13 15:45:09 INFO mapred.JobClient:     Combine input records=0
14/01/13 15:45:09 INFO mapred.JobClient:     Reduce input records=20
14/01/13 15:45:09 INFO mapred.JobClient:     Reduce input groups=20
14/01/13 15:45:09 INFO mapred.JobClient:     Combine output records=0
14/01/13 15:45:09 INFO mapred.JobClient:     Physical memory (bytes) snapshot=1788379136
14/01/13 15:45:09 INFO mapred.JobClient:     Reduce output records=0
14/01/13 15:45:09 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=10679681024
14/01/13 15:45:09 INFO mapred.JobClient:     Map output records=20
Job Finished in 57.825 seconds
Estimated value of Pi is 3.14158440000000000000
You can check the job tracker status page to look at complete job status
complete_job
Drill down into completed job and you can see more details on Map Reduce tasks.
detail_job_output
At last do not forget to terminate your amazon ec2 instances or you will be continued to get charged
terminate_ec2
That’s it for this article, hope you find it useful
Happy Hadoop Year!