Sort By vs Order By vs Cluster By vs Distribute by in hive

In Hive we have different command contructs like sort by, order by, cluster by and distribute by they can be confusion to differentiate among them. Below given are the differentiation among them along with examples:

Sort By:-

Hive uses sort by to sort the data based on the data type of the column to be used for sorting per reducer i.e. overall sorting of output is not maintained. e.g. if column is of numeric type the data will be sorted per reducer in numeric order.

For Example:

Here, key and value both are numeric. Suppose we have two reducer have the output as given below.

Reducer 1

Reducer 2

Here, we can clearly see overall output is not in sorted order.

ORDER BY:-

Very similar to ORDER BY of SQL. The overall sorting is maintained in case of order by and output is produced in single reducer. Hence, we need to use limit clause so that reducer is not overloaded.

For Example:

Output:

DISTRIBUTE BY:-

Hive uses the columns in Distribute By to distribute the rows among reducers. All rows with the same Distribute By columns will go to the same reducer. However,Distribute By does not guarantee clustering or sorting properties on the distributed keys.

For example, we are Distributing By x on the following 5 rows to 2 reducer:

Input:

Output:

Reducer1

 

Reducer2:

NOTE:  partition by does not guarantee the ordering per reducer. Also each reducer will contain non-overlapping output ranges.

CLUSTER BY:-

Cluster By is a short-cut for both Distribute By and Sort By.

Ordering : Global ordering between multiple reducers.

Outcome : N or more sorted files with non-overlapping ranges.

For Example:

Instead of specifying Cluster By, the user can specify Distribute By and Sort By, so the partition columns and sort columns can be different.

Example:

 


 

 

Accessing file using hdfs url

Recently i came across a scenario where i need to access the HDFS file system using hdfs url. So, following is the command to access any file path.

Here, master is the hostname of namenode and 54310 is the port.

NOTE: Here, i was using apache Hadoop plane vanila distribution. You need to use the port as per the distribution like cloudera/Hortonwork.

 

UDF vs UDTF vs UDAF in hive

UDF vs UDTF vs UDAF

Hive have so many in-built functions like: But, if we want to extend the functionality of hive we can use UDF, UDTF and UDAF.

UDF:- Please use the given below link to see how UDF works in hive.
UDF
UDTF:- Please use the given below link to see how UDTF works in hive.
UDTF
UDAF:- Please use the given below link to see how UDAF works in hive.
UDAF

Thats it!!!

UDAF in hive

UDAF(User Defined Aggregation Function):-

Aggregate functions perform a calculation on a set of values and return a single value. Values are aggregated in chunks (potentially across many tasks), so the implementation has to be capable of combining partial aggregations into a final result.

E.g. To find the maximum number in a table.

Steps:

  1. First create Max class which extends UDAF and inside this class create an inner static class MaxIntUDAFEvaluator class extending UDAFEvaluator class.

     
  2. Create table as given below:
  3. insert some data using .txt file having numbers
  4. Add jar file to hive with full path on hive CLI/beeline CLI or add this jar to .bashrc.
  5. Create temporary function as shown below:
  6. Use the select statement to find max

     

Thats it!!!

UDTF in hive

UDTF(User Defined Tablular Function) :- 

User defined tabular function works on one row as input and returns multiple rows as output. So here the relation is one to many.

e.g Hive built in EXPLODE() function. Now lets take an array column USER_IDS as ARRAY<10,12,5,45> then SELECT EXPLODE(USER_IDS) as ID FROM T_USER. will give 10,12,5,45 as four different rows in output. UDTF can be used to split a column into multiple column as well which we will look in below example. Here alias “AS” clause is mandatory .

Problem Statement:– Expand the name column from emp table to give it as two separate column in First_name,Last_name.

Solution:-

Here are the following steps needs to be followed in order to solve this problem.

  1. We have to extend a base Class GenericUDTF to write our business logic in Java.
  2. We need to override 3 methods namely initialize(), process() and close() in our class ExpandNameDetails.class.
  3. add jar to classpath:-         Add the exported JAR file to hive classpath using below command from hive terminal: ADD  /home/anuj/HIVE/HIVE-UDTF-split.jar or Alternatively: You can add exported JAR files in bashrc file using command “nano ~/.bashrc” as HIVE_AUX_JAR_PATH = ‘/home/anuj/HIVE/HIVE-UDTF-split.jar’. It will avoid adding your hive jar to class path each time you login to hive session or hadoop as it will be loaded during hadoop cluster loading by the framework itself.
  4. create temporary function:
  5. Try executing the function on emp table name field.

    Thats it!!!

 

UDF in hive

Regular UDF:-  Hive provide us the some of the build in functions but if we want to extend some of the functionality of hive then we can use UDF(User defined Function). These function needs to be added using a java program and a jar needs to be created. Let us discuss this using an example. Following steps need to be followed to create UDF in hive.

Steps:

  1. Create a UDF class extending UDF class.
  2. We need to export the jar from above given java code to the system directory.
  3. add jar file into the hive CLI or beeline terminal.                                                           Add the exported JAR file to hive classpath using below command from hive terminal: ADD  /home/anuj/HIVE/HIVE-UDF-trim.jar or Alternatively: You can add exported JAR files in bashrc file using command “nano ~/.bashrc” as HIVE_AUX_JAR_PATH = ‘/home/anuj/HIVE/HIVE-UDF-trim.jar’. It will avoid adding your hive jar to class path each time you login to hive session or hadoop as it will be loaded during hadoop cluster loading by the framework itself.
  4. add temporary function:  Run the following command to add function to hive.
  5. Use the function in hive commands.


     

This UDF is for primitive types argument only e.g. Text, IntWritable,LongWritable etc. For complex types like: struct, array etc different type of UDF needs to be written.

 

Hive performance improvements

To improve hive performance. Following are the more commonly used ways to improve hive performance:

  1. Execution Engine
  2. Using Custom file formats
  3. Use Vertorization
  4. Bucketing & Partitioning
  5. Tweaking no of mappers and their memory
  6. Parallel execution

 

  1. Execution Engine:- 

    USE TEZ

    Hive can use the Apache Tez execution engine instead of the venerable Map-reduce engine. I won’t go into details about the many benefits of using Tez which are mentioned here; use Tez by setting to ‘true’ the following in the beginning of your Hive query:

     

  2.  Using Custom file formats:- Use ORCFile format for faster performance in queries in hive. It has really fast response time.As an example, consider two large tables A and B (stored as text files, with some columns not all specified here), and a simple query like:

     

    This query may take a long time to execute since tables emp1 and emp2 are both stored as TEXT. Converting these tables to ORCFile format will usually reduce query time significantly:

     

    ORC supports compressed storage (with ZLIB or as shown above with SNAPPY) but also uncompressed storage.

    Converting base tables to ORC is often the responsibility of your ingest team, and it may take them some time to change the complete ingestion process due to other priorities. The benefits of ORCFile are so tangible that I often recommend a do-it-yourself approach as demonstrated above – convert emp1 into emp1_ORC and emp2 into emp2_ORC and do the join that way, so that you benefit from faster queries immediately, with no dependencies on other teams.

  3. USE VECTORIZATION:-

    Vectorized query execution improves performance of operations like scans, aggregations, filters and joins, by performing them in batches of 1024 rows at once instead of single row each time.

    and is easily enabled with two parameters settings:

    Bucketing & Partitioning:- Hive partitioning is an effective method to improve the query performance on larger tables. Partitioning allows you to store data in separate sub-directories under table location. It greatly helps the queries which are queried upon the partition key(s). Although the selection of partition key is always a sensitive decision, it should always be a low cardinal attribute, e.g. if your data is associated with time dimension, then date could be a good partition key. Similarly, if data has association with location, like a country or state, then it’s a good idea to have hierarchical partitions like country/state.                                                                                            Bucketing improves the join performance if the bucket key and join keys are common. Bucketing in Hive distributes the data in different buckets based on the hash results on the bucket key. It also reduces the I/O scans during the join process if the process is happening on the same keys.Additionally it’s important to ensure the bucketing flag is set (SET hive.enforce.bucketing=true;) every time before writing data to the bucketed table. To leverage the bucketing in the join operation we should SET hive.optimize.bucketmapjoin=true. This setting hints to Hive to do bucket level join during the map stage join. It also reduces the scan cycles to find a particular key because bucketing ensures that the key is present in a certain bucket.

  4. Tweaking no of mappers and their memory:-

    The default hive.input.format is set to org.apache.hadoop.hive.ql.io.CombineHiveInputFormat. This configuration could give less number of mappers than the split size (i.e., # blocks in HDFS) of the input table.

    Try setting org.apache.hadoop.hive.ql.io.HiveInputFormat for hive.input.format.

    Note Apache Tez uses org.apache.hadoop.hive.ql.io.HiveInputFormat by the default.

    You can then control the maximum number of mappers via setting:

Joins in hive

Syntax of joins in hive:-

Only equality joins, outer joins, and left semi joins are supported in Hive. Hive does not support join conditions that are not equality conditions as it is very difficult to express such conditions as a map/reduce job. Also, more than two tables can be joined in Hive.

For this blog we will use two table as follow to discuss joins in hive:

Join Example:

Multiple tables can be joined in the same query:

Join implementation with Map Reduce

 Hive converts joins over multiple tables into a single map/reduce job if for every table the same column is used in the join clauses. The query below is converted into a single map/reduce job as only id column for e2 is involved in the join.


It is very interesting to note that any number of tables can be joined in single map/reduce process as long as they fit the above criteria.

However if the join colums are not the same for all tables the is converted into multiple map/reduce jobs


In this case the first map/reduce job joins e1 with e2 and the results are then joined with e3 in the second map/reduce job.

Largest Table LAST

In the MapReduce job for regular inner joins, mappers run on both tables, emitting out records from that need to be joined by evaluating any UDFs in the query and filtering out any records based on the where clause. Then the shuffle phase is run which “shuffles” the keys based on the join key (idin above example). Subsequently, in the reduce phase, essentially a cross-product takes place between records from each table that have the same join key.
                   In every map/reduce stage of the join, the last table in the sequence is streamed through the reducers where as the others are buffered. Therefore, it helps to reduce the memory needed in the reducer for buffering the rows for a particular value of the join key by organizing the tables such that the largest tables appear last in the sequence. e.g. in

all the three tables are joined in a single map/reduce job and the values for a particular value of the id for tables e1 and e2 are buffered in the memory in the reducers. Then for each row retrieved from e3, the join is computed with the buffered rows.

For the query:

there are two map/reduce jobs involved in computing the join. The first of these joins e1 with e2 and buffers the values of e1 while streaming the values of e2 in the reducers. The second of one of these jobs buffers the results of the first join while streaming the values of e3 through the reducers.

Streamtable hint

 You can also specify which table should be streamed usually its the larger one(Table).
e.g.

Outer Joins

LEFT
RIGHT
FULL
e.g. Example for LEFT OUTER JOIN. Similarly for RIGHT and FULL.

NOTE: These joins are not commutative instead they are left-associative regardless whether it LEFT or RIGHT OUTER Joins.

e.g.

Means join condition will start from left it will join e1 and e2 and results will be joined with e3.

Left Semi Join

 LEFT SEMI JOIN implements the correlated IN/EXISTS subquery semantics in an efficient way.Since Hive currently does not support IN/EXISTS subqueries, you can rewrite your queries using LEFT SEMI JOIN. The restrictions of using LEFT SEMI JOIN is that the right-hand-side table should only be referenced in the join condition (ON-clause), but not in WHERE- or SELECT-clauses etc.

This type of query


Can be written as:

Map Side Join

If all but one of the tables being joined are small, the join can be performed as a map only job. The query does not need a reducer. For every mapper a,b is read completely. A restriction is that e1 FULL/RIGHT OUTER JOIN e2 cannot be performed.

Bucketed Map Join

 If the tables being joined are bucketized, and the buckets are a multiple of each other, the buckets can be joined with each other.
In conf/hive-site.xml you need to set the following parameters:

That’s it!!!

Thanks!!!

Hiveserver1 vs Hiveserver2

Hiveserver1:-

HiveServer is an optional service that allows a remote client to submit requests to Hive, using a variety of programming languages, and retrieve results. HiveServer is built on Apache ThriftTM (http://thrift.apache.org/), therefore it is sometimes called the Thrift server although this can lead to confusion because a newer service named HiveServer2 is also built on Thrift. Since the introduction of HiveServer2, HiveServer has also been called HiveServer1.

 

hiveserver1
 Limitations of hiveserver1:-
  • Support for remote client connection but only one client can connect at a time.
  • No session management support.
  • Because of thrift no concurrency control due to thrift API.
  • No Authentication support provided

 

 

 

 

 

 

 

Hiveserver2:-

Hiveserver2 is a improved version which solves the problem of hiveserver1 like: Concurrency,authentication,authorization etc

Hiveserver2 Architecture:-

HiveServer2 implements a new Thrift-based RPC interface that can handle concurrent clients. The current release supports Kerberos, LDAP, and custom pluggable authentication. The new RPC interface also has better options for JDBC and ODBC clients, especially for metadata access.

hiveserver2

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Like the original HiveServer1, HiveServer2 is a container for the Hive execution engine. For each client connection, it creates a new execution context that serves Hive SQL requests from the client. The new RPC interface enables the server to associate this Hive execution context with the thread serving the client’s request.

Clients for HiveServer2:-

  1. JDBC:-
  2. Beeline CLI:- Beeline is a JDBC application based on the SQLLine CLI that supports embedded and remote-client modes. The embedded mode is where the Hive runtime is part of the client process itself; there’s no server involved.
  3. ODBC

Metastore:-

The Hive metastore service runs in its own JVM process. Clients other than Hive, like Apache Pig, connect to this service via HCatalog for metadata access. HiveServer2 supports local as well as remote metastore modes – which is useful when you have more than one service (Pig, Cloudera Impala, and so on) that needs access to metadata. This is the recommended deployment mode with HiveServer2:

hiveserver2_authentication

 

 

 

 

 

 

 

 

Authentication:-

Authentication support is another major feature of HiveServer2. In the original HiveServer, if you can access the host/port over the network, you can access the data – so it relies on support for multiple authentication options to restrict access.

In contrast, HiveServer2 support Kerberos, pass-through LDAP, and pass-through plug-able custom authentication. All client types – JDBC, ODBC, as well as Beeline CLI — support these authentication modes. This enables the Hive deployment to easily integrate with existing authentication services.

Gateway to Secure Hadoop

Today, the Hadoop ecosystem only supports Kerberos for authentication. That means for accessing secure Hadoop, one needs to get a Kerberos ticket. However, enabling Kerberos on every client box can be a very challenging task and thus can restrict access to Hive and Hadoop.

To address that issue, HiveServer2 can authenticate clients over non-Kerberos connections (eg. LDAP) and run queries against Kerberos-secured Hadoop data. This approach allows users to securely access Hive without complex security infrastructure or limitations.

hiveserver2_auth

Different file format in hive.

We are going to discuss the different file formats which are available in hive and we will also learn when to use which format as per our Use case.

In apache hive data is stored in HDFS which is eventually in file system and it has a specific block size (By default which 128MB) which can be configured. Hive has the  following file format which it can handle:

  • textfile
  • sequencefile
  • rcfile
  • orcfile

First of what is file format?

A file format is the way to store or encode the information in a computer system.In hive how records are encoded in a file defines a file format. Mainly the characteristics which varies in a file format are:

data encoding, compression ratio,usage of space and disk I/O.

NOTE: hive does not verify whether data that you are loading matches the schema or not. Instead it matches file format the table definition or not.

1)  TEXTFILE:   TEXTFILE format is a famous input/output format used in Hadoop. In hive if we define a table as TEXTFILE it can load data of form CSV (Comma Separated Values), delimited by Tabs, Spaces and JSON data. This means fields in each record should be separated by comma or space or tab or it may be JSON data. By default if we use TEXTFILE format then each line is considered as a record. HIveQL command to create TEXTFile format is as follow:

The TEXTFILE input and TEXTFILE output format is present in the Hadoop package as shown below:

Loading data into TEXTFILE table:

2) SEQUENCEFILE:- 

We know very well that for large number of small files which are smaller than the block size have a huge negative impact on the hive/map reduce performance. So as to overcome this problem we have sequencefile format in hive. Sequence file act as container for small files. sequence files are the flat files consisting of binary key-value pairs. When Hive converts queries to MapReduce jobs, it decides on the appropriate key-value pairs to be used for a given record.

In hive we can create sequencefile format using the following query:

There are three types of sequence files:
• Uncompressed key/value records.
• Record compressed key/value records – only ‘values’ are compressed here
• Block compressed key/value records – both keys and values are collected in ‘blocks’ separately and compressed. The size of the ‘block’ is configurable. Hive has its own SEQUENCEFILE reader and SEQUENCEFILE writer for reading and writing through sequence files.

To load the data into sequencefile format table we need to copy data from another table. You can’t load textfile directly into this table.

3) RCFILE:- 

RCFILE stands of Record Columnar File which is another type of binary file format which offers high compression rate on the top of the rows. RCFILE is used when we want to perform operations on multiple rows at a time. RCFILEs are flat files consisting of binary key/value pairs, which shares much similarity with SEQUENCEFILE. RCFILE stores columns of a table in form of record in a columnar manner.Facebook uses RCFILE as their default file format.

Create table command in hive for RCFILE format:

Similar to SEQUENCEFILE we can’t use text file to insert into RCFILE table.

4) ORCFILE:-

ORC stands for Optimized Row Columnar which means it can store data in an optimized way than the other file formats. ORC reduces the size of the  original data up to 75%. As a result the speed of data processing also increases. ORC shows better performance than Text, Sequence and RC file  formats. An ORC file contains rows data in groups called as Stripes along with a file footer. ORC format improves the performance when Hive is processing the data.

Create table command in hive for RCFILE format:

Similar to SEQUENCEFILE we can’t use text file to insert into ORCFILE table.

Conclusion: So depending upon the use case you can use the file format.
For example,
a) If your data is delimited by some parameters then you can use TEXTFILE format.
b) If your data is in small files whose size is less than the block size then you can use SEQUENCEFILE format.
c) If you want to perform analytics on your data and you want to store your data efficiently for that then you can use RCFILE format.
d) If you want to store your data in an optimized way which lessens your storage and increases your performance then you can use ORCFILE format. Hope with this Blog you now have a clear picture as to which File Format to use in Hive depending on your data.

Page 1 Page 2