Author: Hardik

I am a software developer with keen interest distributed computing, big data, machine learning and data science.

Working with Akka Actors


Overview

I am going to explain Akka actor model with a simple example fetching weather data from Yahoo, I am going to use akka scala API.

What is an Actor?

According to Webster dictionary an Actor is – one who acts – doer  or  one that takes part in any affair

What is Akka?

Akka is a toolkit and runtime for building highly concurrent,distributed, and resilient
message-driven applications on the JVM.

What is an Actor Model?

Akka has the concept of Actors, which provide an ideal model for thinking about highly concurrent and scalable systems.Actor model is a design pattern for writing concurrent and scalable code that runs on distributed systems.

Why Actor Model?

Multi-threaded programming runs multiple copies of your application code in their own threads and then synchronizing access to any shared objects. While it’s a complex issue, multi-threaded programming has three major faults:

  • Shared objects – protected by synchronized block or method but blocking
  • Deadlock – first thread tries to access synchronized block of second thread, while second thread tries to access synchronized block from first thread resulting in deadlock
  • Scalability – managing threads on multiple JVMs.

While Actor Model is not new and long exist in Haskell and Erlang. Akka run-time makes it fairly straightforward to write actor based applications.

Actor Model, Why Now?

Lot happened in last 20 years from mid-1990s to current date – not only its possible to scale up (at high-price) but we can scale out.

past_present.png

Actor model allows to write scalable software applications in distributed environment without pulling your hairs.

Inside Actor Model

 

actormodel (2)

Let’s understand how it works

  1. Actor System: is a glue that wires Actors, ActorRefs, Dispatchers, and Mailboxes together. Programmers use the ActorSystem to configure the desired implementation of each of these components. It creates and initializes actors.
  2. Next, actor creation process is initiated by sender (usually itself) which invokes actorOf() on ActorSystem creates reference of Actor object. At this time Actor may not be created/initiated. ActorRef acts as a proxy to Actor object and provides interface to communicate with Actor.
  3. Sendor Actor (Caller – in this case “actor itself from step 2”) uses  ! (bang operator known as tell message pattern – “fire-and-forget”, e.g. send a message asynchronously and return immediately.) to tell the receiving Actor about the event (Hi Actor, Can you please process this event?)
  4. ActorRef in response dispatches the event on MessageDispatcher (you can configure ActorSystem to use specific dispatcher by invoking withDispatcher)
  5. MessageDispatcher enqueues the event to Message Queue. .
    1. A MessageQueue is one of the core components in forming an Akka Mailbox.
    2. MessageQueue is where the normal messages that are sent to Actors will be enqueued (and subsequently dequeued)
    3. MessageQueue.enqueue() – enqueue the message to this queue, or throw an exception
  6. MessageDispatcher also looks for MailBox (By default every actor has a single mailbox – UnboundedMailbox). Mailbox holds the messages for receiving Actor. Once it finds the MailBox – MessageDispatcher binds a set of Actors to a thread pool (backed by BlockingQueue) and invokes MailBox.run() method
  7. MessageQueue.dequeue() – dequeue the next message from this queue, return null failing that.
  8. Eventually MailBox schedules the task on Actor which invokes receive() method on the Actor.

Example

Assuming you are working on Linux and have Scala + SBT configured and ready to use.

build.sbt 

name := "ScalaWorld"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.4"
libraryDependencies += "org.scala-lang.modules" % "scala-xml_2.11" % "1.0.5"

RunActors.scala

import akka.actor.{Actor, ActorSystem, Props}
import scala.io.Source
import scala.xml._

class WeatherActor extends Actor {
  override def receive: Receive = {
    case msg => println(msg);
  }
}

object RunActors{
  def main(args: Array[String]) {
    val system = ActorSystem("WeatherSystem")
    val weatherActor = system.actorOf(Props[WeatherActor], name = "weatheractor")
    val start = System.nanoTime
    for (id <- 4118 to  4128) {
      weatherActor !  getWeatherInfo(id.toString())
    }
    val end = System.nanoTime
    println("Time : " + (end - start) / 1000000000.0)
    // ActorSystem starts the ThreadPool, so we want to make sure we terminated the ActorSystem when we are done.
    system.shutdown()
  }

def getWeatherInfo(id: String) = {
    val url = "http://weather.yahooapis.com/forecastrss?w=" + id + "&u=c"
    val response = Source.fromURL(url).mkString
    val xmlResponse = XML.loadString(response)
    //Return tuple of 2 values (Tuple2).
    val returnVal = (xmlResponse \\ "location" \\ "@city",
      xmlResponse \\ "condition" \\ "@temp")
    returnVal
  }
}

SBT

cd into you project directory and type sbt

/home/hardik/Downloads/ScalaWorld> sbt
[info] Loading project definition from /home/hardik/Downloads/ScalaWorld/project
[info] Set current project to ScalaWorld (in build file:/home/hardik/Downloads/ScalaWorld/)
> run-main RunActors
[info] Updating {file:/home/hardik/Downloads/ScalaWorld}scalaworld…
[info] Resolving jline#jline;2.12.1 …
[info] Done updating.
[info] Compiling 1 Scala source to /home/hardik/Downloads/ScalaWorld/target/scala-2.11/classes…
[info] Running RunActors

Output:  I am querying Yahoo Weather WebService API and passing WOEID (Where on Earth ID) starting from 4118 to 4128 and fetching City and Temperature information. As you can see I have executed getWeatherInfo()  both with  and without Actors and you can see the execution time difference.

I only touched the surface of Akka Actors – there is a lot to cover which I will cover in next article.

Normal Execution (without Actor System invoking getWeaeherInfo(id.toString())
============================ 
(Toronto,8)
(Halton Hills,8)
(,)
(Milton,8)
(Oakville,8)
(,)
(Burlington,8)
(Hamilton,8)
(St. Catharines,8)
(Welland,8)
(Dunnville,8)
Time : 3.712243053
Execution With Actors
=============================
(Toronto,8)
(Halton Hills,8)
(,)
(Milton,8)
(Oakville,8)
(,)
(Burlington,8)
(Hamilton,8)
(St. Catharines,8)
(Welland,8)
(Dunnville,8)
Time : 1.359151664



Writing Custom Hive UDF and UDAF


In this article, I am going to walk over easy to follow examples and show how to create Hive UDF and UDAFs – package it in JAR and test in Hive CLI. So let’s begin.

In sqoop import article I imported customers table, similarly I have imported orders table – which I used in Hive Joins article. Also I am using dummy table for UDF verification. You can find relevant sqoop commands on Github

Hive supports of lot of in-built SQL-like functions in HiveQL. But, just in case if there is a need to write your own UDF, no one is stoping you.

UDF (User Defined Function)

Here I am going to show how to write a simple “trim-like” function called “Strip” – of course you can write something more fancier, but my goal here is to take away something in short amount of time. So, let’s begin.

How to write UDF function in Hive?

  1. Create Java class for User Defined Function which extends ora.apache.hadoop.hive.sq.exec.UDF amd implement more than one evaluate() methods and put your desisred logic and you are almost there.
  2. Package your Java class into JAR file (I am using maven)
  3. Go to Hive CLI – ADD your JAR, verify your JARs in Hive CLI classpath
  4. CREATE TEMPORARY FUNCTION in hive which points to your Java class
  5. Use it in Hive SQL and have fun!

There are better ways to do this by writing your own GenericUDF to deal with non-primitive types like Arrays, maps – but I am not going to cover it this article.

I will go into detail for each one.

Create Java class for User Defined Function 

As you can see below I am calling my Java class “Strip” you can call it anything – but important point it extends UDF function and provides two evalualte() implementations.

evaluate(Text str, String stripChars) - will trim specified characters in stripChars from first argument str.
evaluate(Text str) - will trim leading and trailing spaces.
package org.hardik.letsdobigdata;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class Strip extends UDF {

private Text result = new Text();
 public Text evaluate(Text str, String stripChars) {
 if(str == null) {
 return null;
 }
 result.set(StringUtils.strip(str.toString(), stripChars));
 return result;
 }
 public Text evaluate(Text str) {
 if(str == null) {
 return null;
 }
 result.set(StringUtils.strip(str.toString()));
 return result;
 }
}


Package your Java class into JAR

There is a pom.xml attached in Github, please make sure you have maven installed. If you are working with Github clone Go to shell

$ cd HiveUDFs

and run mvn clean package – this will create a JAR file which contains our UDF class. Copy the JAR path.

Go to Hive CLI and ADD UDF JAR 

hive> ADD /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;
Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path

Added resources: [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar]

Verify JAR is in Hive CLI Classpath

You should see your jar in the list.

hive> list jars;
/usr/lib/hive/lib/hive-contrib.jar
/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar

Create Temporary Function

It does not have to be temporary function, you can create your own function, but just to keep things moving, go ahead and create temporary function

You may want to add ADD JAR and CREATE TEMPORARY FUNCTION to .hiverc file and it will execute beginning of each hive session.

UDF Output

first query strips ‘ha’ from string ‘hadoop’ as expected (2 argument evaluate() in code)

second query strips trailing and leading spaces as expected.

hive> CREATE TEMPORARY FUNCTION STRIP AS 'org.hardik.letsdobigdata.Strip';
 hive> select strip('hadoop','ha') from dummy;
 OK
 doop
 Time taken: 0.131 seconds, Fetched: 1 row(s)
 hive> select strip('   hiveUDF ') from dummy;
 OK
 hiveUDF

If you have made this far, Congratulations that was our UDF in action! You can follow the code on github.

UDAF (User Defined Aggregated Function)

Now, equipped with our first UDF knowledge, we will move to a next step. When we say aggregation COUNT,AVG,SUM,MIN,MAX comes to our mind.

I am picking very simple aggregation function AVG/MEAN where I am going to work with “orders” table imported using sqoop, once you import it into hive it will look like below (or you can use LOAD DATA INPATH – totally upto you.)

hive> select * from orders;
 OK
 orders.order_id orders.order_date orders.customer_id orders.amount
 101 2016-01-01 7 3540
 102 2016-03-01 1 240
 103 2016-03-02 6 2340
 104 2016-02-12 3 5000
 105 2016-02-12 3 5500
 106 2016-02-14 9 3005
 107 2016-02-14 1 20
 108 2016-02-29 2 2000
 109 2016-02-29 3 2500
 110 2016-02-27 1 200

Goal of UDAF: Find average amount of orders for all customers in orders table

We are looking for Query: 
SELECT CUSTOMER_ID, AVG(AMOUNT) FROM ORDERS GROUP BY CUSTOMER_ID;

I am going to replace AVG function with “MEAN” function

But before I begin, let’s stop and think as we are leaving in MapReduce world. One of the bottleneck you want to avoid is moving too much data from Map to Reduce phase.

An aggregate function is more difficult to write than regular UDF. Values are aggregated in chunks (acrosss many maps or reducers), so the implementation has to be capable of combining partial aggregations into final results.

At a high-level, there are two parts to implementing a Generic UDAF

  1. evaluator – The evaluator class then actually implements the UDAF logic.
  2. resolver – handles type checking and operator overloading (if you want it), and helps Hive find the correct evaluator class for a given set of argument types

We are not creating GenericUDAF – we are creating our one-time-kind of aggregated functon so we do not have to worry about resolver – I am planning write on GenericUDF/GenericUDAF, though – may be some other day, but soon. 🙂

How to write UDAF?

  1. Create Java class which extends org.apache.hadoop.hive.ql.exec.hive.UDAF;
  2. Create Inner Class which implements UDAFEvaluator
  3. Implment five methods ()
    1. init() – The init() method initalizes the evaluator and resets its internal state. We are using new Column() in code below to indicate that no values have been aggregated yet.
    2. iterate() – this method is called everytime there is anew value to be aggregated. The evaulator should update its internal state with the result of performing the agrregation (we are doing sum – see below). We return true to indicate that input was valid.
    3. terminatePartial() – this method is called when Hive wants a result for the partial aggregation. The method must return an object that encapsulates the state of the aggregation.
    4. merge() – this method is called when Hive decides to combine one partial aggregation with another.
    5. terminate() – this method is called when the final result of the aggregation is needed.
  4. Compile and Package JAR
  5. CREATE TEMPORARY FUNCTION in hive CLI
  6. Run Aggregation Query – Verify Output!!!

MeanUDAF.java

package org.hardik.letsdobigdata;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.hardik.letsdobigdata.MeanUDAF.MeanUDAFEvaluator.Column;

@Description(name = "Mean", value = "_FUNC(double) - computes mean", extended = "select col1, MeanFunc(value) from table group by col1;")

public class MeanUDAF extends UDAF {

// Define Logging
static final Log LOG = LogFactory.getLog(MeanUDAF.class.getName());

public static class MeanUDAFEvaluator implements UDAFEvaluator {

/**
 * Use Column class to serialize intermediate computation
 * This is our groupByColumn
 */
public static class Column {
 double sum = 0;
 int count = 0;
 }

private Column col = null;

public MeanUDAFEvaluator() {
 super();
 init();
 }
// A - Initalize evaluator - indicating that no values have been
 // aggregated yet.

public void init() {
 LOG.debug("Initialize evaluator");
 col = new Column();
 }

// B- Iterate every time there is a new value to be aggregated
 public boolean iterate(double value) throws HiveException {
 LOG.debug("Iterating over each value for aggregation");
 if (col == null)
 throw new HiveException("Item is not initialized");
 col.sum = col.sum + value;
 col.count = col.count + 1;
 return true;
 }
// C - Called when Hive wants partially aggregated results.
 public Column terminatePartial() {
 LOG.debug("Return partially aggregated results");
 return col;
 }
 // D - Called when Hive decides to combine one partial aggregation with another
 public boolean merge(Column other) {
 LOG.debug("merging by combining partial aggregation");
 if(other == null) {
 return true;
 }
 col.sum += other.sum;
 col.count += other.count;
 return true; 
}
 // E - Called when the final result of the aggregation needed.
 public double terminate(){
 LOG.debug("At the end of last record of the group - returning final result"); 
 return col.sum/col.count;
 }
 
 }
}

udaf.png

Package and ADD JAR

hive> ADD JAR /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;
Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path
Added resources: [/home/cloudera/workspace/HiveUDFs/target/StudentCourseMRJob-0.0.1-SNAPSHOT.jar]

CREATE FUNCTION in HIVE

hive> CREATE TEMPORARY FUNCTION MeanFunc AS 'org.hardik.letsdobigdata.MeanUDAF';
OK

Verify Output

Execute below group by query , our function is called MeanFunc

 hive> select customer_id, MeanFunc(amount) from orders group by customer_id;
FAILED: SemanticException [Error 10001]: Line 1:42 Table not found 'orders'
hive> use sqoop_workspace;
OK
Time taken: 0.247 seconds
hive> select customer_id, MeanFunc(amount) from orders group by customer_id;
Query ID = cloudera_20160302030202_fb24b7c1-4227-4640-afb9-4ccd29bd735f
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1456782715090_0020, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1456782715090_0020/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1456782715090_0020
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2016-03-02 03:03:16,703 Stage-1 map = 0%,  reduce = 0%
2016-03-02 03:03:53,241 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 3.31 sec
2016-03-02 03:03:55,593 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.9 sec
2016-03-02 03:04:09,201 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.18 sec
MapReduce Total cumulative CPU time: 6 seconds 180 msec
Ended Job = job_1456782715090_0020
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 2  Reduce: 1   Cumulative CPU: 6.18 sec   HDFS Read: 12524 HDFS Write: 77 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 180 msec
OK
1    153.33333333333334
2    2000.0
3    4333.333333333333
6    2340.0
7    3540.0
9    3005.0
Time taken: 72.172 seconds, Fetched: 6 row(s)

Verify Individual customer_id : As you can see group by value matches – you can cross check manually – Thanks for your time and reading my blog – hope this is helpful!!!

hive> select * from orders where customer_id = 1;
OK
102    2016-03-01    1    240
107    2016-02-14    1    20
110    2016-02-27    1    200
Time taken: 0.32 seconds, Fetched: 3 row(s)
hive> select * from orders where customer_id = 2;
OK
108    2016-02-29    2    2000
Time taken: 0.191 seconds, Fetched: 1 row(s)
hive> select * from orders where customer_id = 3;
OK
104    2016-02-12    3    5000
105    2016-02-12    3    5500
109    2016-02-29    3    2500
Time taken: 0.093 seconds, Fetched: 3 row(s)

Hive Joins


Covering the basics of joins in hive.

joins

We will be working with two tables customer and orders that we imported in sqoop import article and going to perform following.

  1. INNER JOIN – Select records that have matching values in both tables.
  2. LEFT JOIN (LEFT OUTER JOIN) – returns all the values from the left table, plus the matched values from the right table, or NULL in case of no matching join predicate
  3. RIGHT JOIN (RIGHT OUTER JOIN) A RIGHT JOIN returns all the values from the right table, plus the matched values from the left table, or NULL in case of no matching join predicate
  4. FULL JOIN (FULL OUTER JOIN) – Selects all records that match either left or right table records.
  5. LEFT SEMI JOIN: Only returns the records from the left-hand table. Hive doesn’t support IN subqueries so you can’t do

SELECT * FROM TABLE_A WHERE TABLE_A.ID IN (SELECT ID FROM TABLE_B);

Customer Table

Hive Tip: to print column headers in command line

hive> set hive.cli.print.header=true;
hive> select * from customers;
 OK
 customers.id customers.name
 1 John
 2 Kevin
 19 Alex
 3 Mark
 4 Jenna
 5 Robert
 6 Zoya
 7 Sam
 8 George
 9 Peter

Orders Table:

hive> select * from orders;
 OK
 order_id orders.order_date orders.customer_id orders.amount
 101 2016-01-01 7 3540
 102 2016-03-01 1 240
 103 2016-03-02 6 2340
 104 2016-02-12 3 5000
 105 2016-02-12 3 5500
 106 2016-02-14 9 3005
 107 2016-02-14 1 20
 108 2016-02-29 2 2000
 109 2016-02-29 3 2500
 110 2016-02-27 1 200

 

INNER JOIN

Select records that have matching values in both tables.

hive> select c.id, c.name, o.order_date, o.amount from customers c inner join orders o ON (c.id = o.customer_id);
Output
c.id c.name o.order_date o.amount
 7 Sam 2016-01-01 3540
 1 John 2016-03-01 240
 6 Zoya 2016-03-02 2340
 3 Mark 2016-02-12 5000
 3 Mark 2016-02-12 5500
 9 Peter 2016-02-14 3005
 1 John 2016-02-14 20
 2 Kevin 2016-02-29 2000
 3 Mark 2016-02-29 2500
 1 John 2016-02-27 200

LEFT JOIN (LEFT OUTER JOIN)

Returns all the values from the left table, plus the matched values from the right table, or NULL in case of no matching join predicate

hive> select c.id, c.name, o.order_date, o.amount from customers c left outer join orders o ON (c.id = o.customer_id);
Output
 c.id    c.name    o.order_date    o.amount
 1    John    2016-03-01    240
 1    John    2016-02-14    20
 1    John    2016-02-27    200
 2    Kevin    2016-02-29    2000
 19    Alex    NULL    NULL
 3    Mark    2016-02-12    5000
 3    Mark    2016-02-12    5500
 3    Mark    2016-02-29    2500
 4    Jenna    NULL    NULL
 5    Robert    NULL    NULL
 6    Zoya    2016-03-02    2340
 7    Sam    2016-01-01    3540
 8    George    NULL    NULL
 9    Peter    2016-02-14    3005
 Time taken: 40.462 seconds, Fetched: 14 row(s)

RIGHT JOIN (RIGHT OUTER JOIN)

Returns all the values from the right table, plus the matched values from the left table, or NULL in case of no matching join predicate

hive> select c.id, c.name, o.order_date, o.amount from customers c left outer join orders o ON (c.id = o.customer_id);
Output
 c.id    c.name    o.order_date    o.amount
 7    Sam    2016-01-01    3540
 1    John    2016-03-01    240
 6    Zoya    2016-03-02    2340
 3    Mark    2016-02-12    5000
 3    Mark    2016-02-12    5500
 9    Peter    2016-02-14    3005
 1    John    2016-02-14    20
 2    Kevin    2016-02-29    2000
 3    Mark    2016-02-29    2500
 1    John    2016-02-27    200

FULL JOIN (FULL OUTER JOIN)

Selects all records that match either left or right table records.

hive> select c.id, c.name, o.order_date, o.amount from customers c full outer join orders o ON (c.id = o.customer_id);

Output

c.id c.name o.order_date o.amount
 1 John 2016-02-27 200
 1 John 2016-02-14 20
 1 John 2016-03-01 240
 19 Alex NULL NULL
 2 Kevin 2016-02-29 2000
 3 Mark 2016-02-29 2500
 3 Mark 2016-02-12 5500
 3 Mark 2016-02-12 5000
 4 Jenna NULL NULL
 5 Robert NULL NULL
 6 Zoya 2016-03-02 2340
 7 Sam 2016-01-01 3540
 8 George NULL NULL
 9 Peter 2016-02-14 3005

LEFT SEMI JOIN

Find all the customers where at least one order exist or find all customer who has placed an order.

left_semi_join

hive> select *  from customers  left semi join orders  ON 
(customers.id = orders.customer_id);

OUTPUT

customers.id    customers.name
1    John
2    Kevin
3    Mark
6    Zoya
7    Sam
9    Peter
Time taken: 56.362 seconds, Fetched: 6 row(s)

That’s it in this article. I hope you find this useful, thank you for reading.

Sqoop: Import Data from MySQL to Hive


sqoop drawing.png

Prerequisite: Hadoop Environment with Sqoop and Hive installed and working. To speed up the work, I am using Cloudera Quickstart VM  (requires 4GB of RAM) – although you can also work with Hortonworks Data Platform (requires 8GB of RAM) – since my laptop has only 8GB of RAM – I prefer to work with Cloudera VM image

If you are working with Cloudera/HDP VM and its all fired up in Virtualbox – it becomes easier to work with many of hadoop ecoystem packages comes pre-installed (mysql, oozie, hadoop,hive,zookeeper, storm, kafka, spark etc…)

Create table in mysql

In cloudera VM, open command prompt and just makesure mysql is installed. For me it’s

shell> mysql --version
mysql  Ver 14.14 Distrib 5.1.66, for redhat-linux-gnu (x86_64) using readline 5.

You should always work in your own database, so create database in mysql using

mysql> create database sqoop;

then,

mysql> use sqoop;
mysql> create table customer(id varchar(3), name varchar(20), age varchar(3), salary integer(10));
Query OK, 0 rows affected (0.09 sec)

mysql> desc customer;
+--------+-------------+------+-----+---------+-------+
| Field  | Type        | Null | Key | Default | Extra |
+--------+-------------+------+-----+---------+-------+
| id     | varchar(3)  | YES  |     | NULL    |       |
| name   | varchar(20) | YES  |     | NULL    |       |
| age    | varchar(3)  | YES  |     | NULL    |       |
| salary | int(10)     | YES  |     | NULL    |       |
+--------+-------------+------+-----+---------+-------+

 

mysql> select * from customer;
+------+--------+------+--------+
| id   | name   | age  | salary |
+------+--------+------+--------+
| 1    | John   | 30   |  80000 |
| 2    | Kevin  | 33   |  84000 |
| 3    | Mark   | 28   |  90000 |
| 4    | Jenna  | 34   |  93000 |
| 5    | Robert | 32   | 100000 |
| 6    | Zoya   | 40   |  60000 |
| 7    | Sam    | 37   |  75000 |
| 8    | George | 31   |  67000 |
| 9    | Peter  | 23   |  70000 |
| 19   | Alex   | 26   |  74000 |
+------+--------+------+-----

Let’s start Sqooping

As you can see customer table does not have any primary key. I have added few records in customet table. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range.

If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by argument. For example, --split-by id.

Since I want to import this table directly into hive I am adding –hive-import to my sqoop command.

sqoop import --connect jdbc:mysql://localhost:3306/sqoop 
--username root 
-P 
--split-by id 
--columns id,name 
--table customer  
--target-dir /user/cloudera/ingest/raw/customers 
--fields-terminated-by "," 
--hive-import 
--create-hive-table 
--hive-table sqoop_workspace.customers

here’s what individual sqoop command option means

  • connect – provide jdbc string
  • username – database username
  • -P  – will ask the password in console – alternatively you can use –password but this is not a good practice as its visible in your job execution logs and asking for trouble. One way to deal with this is store db password in a file in HDFS and provide at runtime.
  • table – tell which table you want to import from mysql – here’s its customer
  • split-by – specify whats your splitting column – I am specifying id here.
  • target-dir – HDFS destination dir
  • fields-terminated-by – I have specified comma (as by default it will import data into HDFS with comma separated values)
  • hive-import – Import table into hive (Uses Hive’s default delimiters if none are set.)
  • create-hive-table – If set job will fail if hive table already exist – it works in this case.
  • hive-table – specify <db_name>.<table_name> here its sqoop_workspace.customers where sqoop_workspace is my database and customers is the table name.

As you can see below, sqoop is a map-reduce job. Notice that I am using -P for password option.  While this works, but can be easliy parameterized by using –password  and reading it from file.

sqoop import --connect jdbc:mysql://localhost:3306/sqoop --username root -P --split-by id --columns id,name --table customer  --target-dir /user/cloudera/ingest/raw/customers --fields-terminated-by "," --hive-import --create-hive-table --hive-table sqoop_workspace.customers
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
16/03/01 12:59:44 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.5.0
Enter password: 
16/03/01 12:59:54 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
16/03/01 12:59:54 INFO tool.CodeGenTool: Beginning code generation
16/03/01 12:59:55 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customer` AS t LIMIT 1
16/03/01 12:59:56 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customer` AS t LIMIT 1
16/03/01 12:59:56 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/6471c43b5c867834458d3bf5a67eade2/customer.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
16/03/01 13:00:01 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/6471c43b5c867834458d3bf5a67eade2/customer.jar
16/03/01 13:00:01 WARN manager.MySQLManager: It looks like you are importing from mysql.
16/03/01 13:00:01 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
16/03/01 13:00:01 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
16/03/01 13:00:01 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
16/03/01 13:00:01 INFO mapreduce.ImportJobBase: Beginning import of customer
16/03/01 13:00:01 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
16/03/01 13:00:02 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
16/03/01 13:00:04 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/03/01 13:00:05 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/03/01 13:00:11 INFO db.DBInputFormat: Using read commited transaction isolation
16/03/01 13:00:11 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `customer`
16/03/01 13:00:11 WARN db.TextSplitter: Generating splits for a textual index column.
16/03/01 13:00:11 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
16/03/01 13:00:11 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
16/03/01 13:00:11 INFO mapreduce.JobSubmitter: number of splits:4
16/03/01 13:00:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1456782715090_0004
16/03/01 13:00:13 INFO impl.YarnClientImpl: Submitted application application_1456782715090_0004
16/03/01 13:00:13 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1456782715090_0004/
16/03/01 13:00:13 INFO mapreduce.Job: Running job: job_1456782715090_0004
16/03/01 13:00:47 INFO mapreduce.Job: Job job_1456782715090_0004 running in uber mode : false
16/03/01 13:00:48 INFO mapreduce.Job:  map 0% reduce 0%
16/03/01 13:01:43 INFO mapreduce.Job:  map 25% reduce 0%
16/03/01 13:01:46 INFO mapreduce.Job:  map 50% reduce 0%
16/03/01 13:01:48 INFO mapreduce.Job:  map 100% reduce 0%
16/03/01 13:01:48 INFO mapreduce.Job: Job job_1456782715090_0004 completed successfully
16/03/01 13:01:48 INFO mapreduce.Job: Counters: 30
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=548096
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=409
        HDFS: Number of bytes written=77
        HDFS: Number of read operations=16
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=8
    Job Counters 
        Launched map tasks=4
        Other local map tasks=5
        Total time spent by all maps in occupied slots (ms)=216810
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=216810
        Total vcore-seconds taken by all map tasks=216810
        Total megabyte-seconds taken by all map tasks=222013440
    Map-Reduce Framework
        Map input records=10
        Map output records=10
        Input split bytes=409
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=2400
        CPU time spent (ms)=5200
        Physical memory (bytes) snapshot=418557952
        Virtual memory (bytes) snapshot=6027804672
        Total committed heap usage (bytes)=243007488
    File Input Format Counters 
        Bytes Read=0
    File Output Format Counters 
        Bytes Written=77
16/03/01 13:01:48 INFO mapreduce.ImportJobBase: Transferred 77 bytes in 104.1093 seconds (0.7396 bytes/sec)
16/03/01 13:01:48 INFO mapreduce.ImportJobBase: Retrieved 10 records.
16/03/01 13:01:49 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customer` AS t LIMIT 1
16/03/01 13:01:49 INFO hive.HiveImport: Loading uploaded data into Hive

Logging initialized using configuration in jar:file:/usr/jars/hive-common-1.1.0-cdh5.5.0.jar!/hive-log4j.properties
OK
Time taken: 2.163 seconds
Loading data to table sqoop_workspace.customers
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00000': User does not belong to supergroup
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00001': User does not belong to supergroup
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00002': User does not belong to supergroup
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00003': User does not belong to supergroup
Table sqoop_workspace.customers stats: [numFiles=4, totalSize=77]
OK
Time taken: 1.399 seconds

Finally, let’s  verify the output in hive.

Finally, verify output in Hive:

hive> show databases;
OK
default
sqoop_workspace
Time taken: 0.034 seconds, Fetched: 2 row(s)
hive> use sqoop_workspace;
OK
Time taken: 0.063 seconds
hive> show tables;
OK
customers
Time taken: 0.036 seconds, Fetched: 1 row(s)
hive> show create table customers;
OK
CREATE TABLE `customers`(
  `id` string, 
  `name` string)
COMMENT 'Imported by sqoop on 2016/03/01 13:01:49'
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
  LINES TERMINATED BY '\n' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers'
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='true', 
  'numFiles'='4', 
  'totalSize'='77', 
  'transient_lastDdlTime'='1456866115')
Time taken: 0.26 seconds, Fetched: 18 row(s)

hive> select * from customers;
OK
1    John
2    Kevin
19    Alex
3    Mark
4    Jenna
5    Robert
6    Zoya
7    Sam
8    George
9    Peter
Time taken: 1.123 seconds, Fetched: 10 row(s).

That’s it for now. Hope you find it useful, thanks for your support and reading my blog.

Virtualization Matters: Part 1: From VirtualBox to Vagrant to Docker


In Part 1 of this series I am going to talk about Virtualbox and Vagrant, In Part 2 will talk about what are the differences between Vagrant and Docker. As development cycles become shorter and shorter and your startup wants to be 10 years ahead today virtulization affects us all. In Part-3 I want to build and test Spark Application on Docker, stay tuned!!!

Back in 2007 – Rise of Type-2 Hypervisor

Just to set some context, everyone is familiar with VirtualBox, basically running any OS as a guestOS in your host machine (linux/windows/mac) etc.

VirtualBox

 

 

Developer’s life in Virtual Box – well it works great :), wait till it crahses again..again and again…man I lost my last 2 days of work – wish I would have git it soon

 

bang_head

2010: Here comes Vagrant

Written in Ruby, Vagrant is a scripting engine for VirtualBox. So you can setup multiple virtual machines.

But why someone want to do that?

Import/use Pre-configured Virtual Machine

  • For e.g to fire up existing Virtual Machine called hashicorp/precise64 just execute
  • $ vagrant init hashicorp/precise64 (will create vagrantfile)
  • $ vagrant up

After running the above two commands, you will have a fully running virtual              machine in VirtualBox running Ubuntu 12.04 LTS 64-bit.

You can SSH into this machine with vagrant ssh, and when you are done playing around, you can terminate the virtual machine with vagrant destroy.

Configure VM (with .vagrantfile)

There is one Vagrantfile per project, and the Vagrantfile is supposed to be committed to version control. This allows other developers involved in the project to check out the code, run vagrant up, and be on their way. Vagrantfiles are portable to every platform

You can configure your vagrant vm with bunch of properties available under

Provisining

Up to this point you can fire and configure your VM in under 2 mins. But you still have not installed any software in your virtual machine, you say to your boss: I can install those 50 things (cassandra,elasticsearch,spark etc..)needed to develop/run my software – give me a week you say…and your boss say…GIVE ME A BREAK!!!

Vagrant gives you multiple options for provisioning (automate installation) your virtual machine, from simple shell scripts to more complex tools like Chef and Puppets.Vagrant Provisioning is very powerful, everything that is repeatable is meant to be automated and this is exactly what you get

For e.g installing apache – create install-apache.sh and modify your vagrantfile

apt-get update
apt-get install -y apache2
if ! [ -L /var/www ]; then
  rm -rf /var/www
  ln -fs /vagrant /var/www
fi

Vagrant.configure("2") do |config| config.vm.box = "hashicorp/precise64" config.vm.provision :shell, path: "install-apache.sh" end

 

Vagrant in nutshell

Vagrant allows you to create/extend and provision new virtual machine so its available to masses.

Vagrant

Next I want to extend this article with Docker (a lightweight container); future of application delivery –

docker.png

I have covered very basics of Vagrant. But the point here is automate everything.  We are software developers and we should automate everything as much as possible. Thanks for reading!