Writing Custom Hive UDF and UDAF


In this article, I am going to walk over easy to follow examples and show how to create Hive UDF and UDAFs – package it in JAR and test in Hive CLI. So let’s begin.

In sqoop import article I imported customers table, similarly I have imported orders table – which I used in Hive Joins article. Also I am using dummy table for UDF verification. You can find relevant sqoop commands on Github

Hive supports of lot of in-built SQL-like functions in HiveQL. But, just in case if there is a need to write your own UDF, no one is stoping you.

UDF (User Defined Function)

Here I am going to show how to write a simple “trim-like” function called “Strip” – of course you can write something more fancier, but my goal here is to take away something in short amount of time. So, let’s begin.

How to write UDF function in Hive?

  1. Create Java class for User Defined Function which extends ora.apache.hadoop.hive.sq.exec.UDF amd implement more than one evaluate() methods and put your desisred logic and you are almost there.
  2. Package your Java class into JAR file (I am using maven)
  3. Go to Hive CLI – ADD your JAR, verify your JARs in Hive CLI classpath
  4. CREATE TEMPORARY FUNCTION in hive which points to your Java class
  5. Use it in Hive SQL and have fun!

There are better ways to do this by writing your own GenericUDF to deal with non-primitive types like Arrays, maps – but I am not going to cover it this article.

I will go into detail for each one.

Create Java class for User Defined Function 

As you can see below I am calling my Java class “Strip” you can call it anything – but important point it extends UDF function and provides two evalualte() implementations.

evaluate(Text str, String stripChars) - will trim specified characters in stripChars from first argument str.
evaluate(Text str) - will trim leading and trailing spaces.
package org.hardik.letsdobigdata;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class Strip extends UDF {

private Text result = new Text();
 public Text evaluate(Text str, String stripChars) {
 if(str == null) {
 return null;
 }
 result.set(StringUtils.strip(str.toString(), stripChars));
 return result;
 }
 public Text evaluate(Text str) {
 if(str == null) {
 return null;
 }
 result.set(StringUtils.strip(str.toString()));
 return result;
 }
}


Package your Java class into JAR

There is a pom.xml attached in Github, please make sure you have maven installed. If you are working with Github clone Go to shell

$ cd HiveUDFs

and run mvn clean package – this will create a JAR file which contains our UDF class. Copy the JAR path.

Go to Hive CLI and ADD UDF JAR 

hive> ADD /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;
Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path

Added resources: [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar]

Verify JAR is in Hive CLI Classpath

You should see your jar in the list.

hive> list jars;
/usr/lib/hive/lib/hive-contrib.jar
/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar

Create Temporary Function

It does not have to be temporary function, you can create your own function, but just to keep things moving, go ahead and create temporary function

You may want to add ADD JAR and CREATE TEMPORARY FUNCTION to .hiverc file and it will execute beginning of each hive session.

UDF Output

first query strips ‘ha’ from string ‘hadoop’ as expected (2 argument evaluate() in code)

second query strips trailing and leading spaces as expected.

hive> CREATE TEMPORARY FUNCTION STRIP AS 'org.hardik.letsdobigdata.Strip';
 hive> select strip('hadoop','ha') from dummy;
 OK
 doop
 Time taken: 0.131 seconds, Fetched: 1 row(s)
 hive> select strip('   hiveUDF ') from dummy;
 OK
 hiveUDF

If you have made this far, Congratulations that was our UDF in action! You can follow the code on github.

UDAF (User Defined Aggregated Function)

Now, equipped with our first UDF knowledge, we will move to a next step. When we say aggregation COUNT,AVG,SUM,MIN,MAX comes to our mind.

I am picking very simple aggregation function AVG/MEAN where I am going to work with “orders” table imported using sqoop, once you import it into hive it will look like below (or you can use LOAD DATA INPATH – totally upto you.)

hive> select * from orders;
 OK
 orders.order_id orders.order_date orders.customer_id orders.amount
 101 2016-01-01 7 3540
 102 2016-03-01 1 240
 103 2016-03-02 6 2340
 104 2016-02-12 3 5000
 105 2016-02-12 3 5500
 106 2016-02-14 9 3005
 107 2016-02-14 1 20
 108 2016-02-29 2 2000
 109 2016-02-29 3 2500
 110 2016-02-27 1 200

Goal of UDAF: Find average amount of orders for all customers in orders table

We are looking for Query: 
SELECT CUSTOMER_ID, AVG(AMOUNT) FROM ORDERS GROUP BY CUSTOMER_ID;

I am going to replace AVG function with “MEAN” function

But before I begin, let’s stop and think as we are leaving in MapReduce world. One of the bottleneck you want to avoid is moving too much data from Map to Reduce phase.

An aggregate function is more difficult to write than regular UDF. Values are aggregated in chunks (acrosss many maps or reducers), so the implementation has to be capable of combining partial aggregations into final results.

At a high-level, there are two parts to implementing a Generic UDAF

  1. evaluator – The evaluator class then actually implements the UDAF logic.
  2. resolver – handles type checking and operator overloading (if you want it), and helps Hive find the correct evaluator class for a given set of argument types

We are not creating GenericUDAF – we are creating our one-time-kind of aggregated functon so we do not have to worry about resolver – I am planning write on GenericUDF/GenericUDAF, though – may be some other day, but soon. 🙂

How to write UDAF?

  1. Create Java class which extends org.apache.hadoop.hive.ql.exec.hive.UDAF;
  2. Create Inner Class which implements UDAFEvaluator
  3. Implment five methods ()
    1. init() – The init() method initalizes the evaluator and resets its internal state. We are using new Column() in code below to indicate that no values have been aggregated yet.
    2. iterate() – this method is called everytime there is anew value to be aggregated. The evaulator should update its internal state with the result of performing the agrregation (we are doing sum – see below). We return true to indicate that input was valid.
    3. terminatePartial() – this method is called when Hive wants a result for the partial aggregation. The method must return an object that encapsulates the state of the aggregation.
    4. merge() – this method is called when Hive decides to combine one partial aggregation with another.
    5. terminate() – this method is called when the final result of the aggregation is needed.
  4. Compile and Package JAR
  5. CREATE TEMPORARY FUNCTION in hive CLI
  6. Run Aggregation Query – Verify Output!!!

MeanUDAF.java

package org.hardik.letsdobigdata;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.hardik.letsdobigdata.MeanUDAF.MeanUDAFEvaluator.Column;

@Description(name = "Mean", value = "_FUNC(double) - computes mean", extended = "select col1, MeanFunc(value) from table group by col1;")

public class MeanUDAF extends UDAF {

// Define Logging
static final Log LOG = LogFactory.getLog(MeanUDAF.class.getName());

public static class MeanUDAFEvaluator implements UDAFEvaluator {

/**
 * Use Column class to serialize intermediate computation
 * This is our groupByColumn
 */
public static class Column {
 double sum = 0;
 int count = 0;
 }

private Column col = null;

public MeanUDAFEvaluator() {
 super();
 init();
 }
// A - Initalize evaluator - indicating that no values have been
 // aggregated yet.

public void init() {
 LOG.debug("Initialize evaluator");
 col = new Column();
 }

// B- Iterate every time there is a new value to be aggregated
 public boolean iterate(double value) throws HiveException {
 LOG.debug("Iterating over each value for aggregation");
 if (col == null)
 throw new HiveException("Item is not initialized");
 col.sum = col.sum + value;
 col.count = col.count + 1;
 return true;
 }
// C - Called when Hive wants partially aggregated results.
 public Column terminatePartial() {
 LOG.debug("Return partially aggregated results");
 return col;
 }
 // D - Called when Hive decides to combine one partial aggregation with another
 public boolean merge(Column other) {
 LOG.debug("merging by combining partial aggregation");
 if(other == null) {
 return true;
 }
 col.sum += other.sum;
 col.count += other.count;
 return true; 
}
 // E - Called when the final result of the aggregation needed.
 public double terminate(){
 LOG.debug("At the end of last record of the group - returning final result"); 
 return col.sum/col.count;
 }
 
 }
}

udaf.png

Package and ADD JAR

hive> ADD JAR /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;
Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path
Added resources: [/home/cloudera/workspace/HiveUDFs/target/StudentCourseMRJob-0.0.1-SNAPSHOT.jar]

CREATE FUNCTION in HIVE

hive> CREATE TEMPORARY FUNCTION MeanFunc AS 'org.hardik.letsdobigdata.MeanUDAF';
OK

Verify Output

Execute below group by query , our function is called MeanFunc

 hive> select customer_id, MeanFunc(amount) from orders group by customer_id;
FAILED: SemanticException [Error 10001]: Line 1:42 Table not found 'orders'
hive> use sqoop_workspace;
OK
Time taken: 0.247 seconds
hive> select customer_id, MeanFunc(amount) from orders group by customer_id;
Query ID = cloudera_20160302030202_fb24b7c1-4227-4640-afb9-4ccd29bd735f
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1456782715090_0020, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1456782715090_0020/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1456782715090_0020
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2016-03-02 03:03:16,703 Stage-1 map = 0%,  reduce = 0%
2016-03-02 03:03:53,241 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 3.31 sec
2016-03-02 03:03:55,593 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.9 sec
2016-03-02 03:04:09,201 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 6.18 sec
MapReduce Total cumulative CPU time: 6 seconds 180 msec
Ended Job = job_1456782715090_0020
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 2  Reduce: 1   Cumulative CPU: 6.18 sec   HDFS Read: 12524 HDFS Write: 77 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 180 msec
OK
1    153.33333333333334
2    2000.0
3    4333.333333333333
6    2340.0
7    3540.0
9    3005.0
Time taken: 72.172 seconds, Fetched: 6 row(s)

Verify Individual customer_id : As you can see group by value matches – you can cross check manually – Thanks for your time and reading my blog – hope this is helpful!!!

hive> select * from orders where customer_id = 1;
OK
102    2016-03-01    1    240
107    2016-02-14    1    20
110    2016-02-27    1    200
Time taken: 0.32 seconds, Fetched: 3 row(s)
hive> select * from orders where customer_id = 2;
OK
108    2016-02-29    2    2000
Time taken: 0.191 seconds, Fetched: 1 row(s)
hive> select * from orders where customer_id = 3;
OK
104    2016-02-12    3    5000
105    2016-02-12    3    5500
109    2016-02-29    3    2500
Time taken: 0.093 seconds, Fetched: 3 row(s)
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s