LATERAL VIEW EXPLODE IN HIVE

To access individual elements of Array we use Built-in Table-Generating Functions (UDTF) available in Hive. Below explained is simple example with description in each step.

Built-in Table-Generating Functions (UDTF)

To understand details of any function we can run describe command, to get list of available functions we can run show functions;
hive> describe function explode;
explode(a) – separates the elements of array a into multiple rows, or the elements of a map into multiple rows and columns

Now we are creating table with name products, id of int type, product name of string type & ProductColorOptions of Array of String type.
hive> CREATE TABLE Products
> (id INT, ProductName STRING, ProductColorOptions ARRAY<STRING>);

We can not directly insert data in table containing arrays, we need to create dummy table & then use that table to insert data as shown below:
hive> create table dummy (a String);

hive> insert into table dummy values (‘a’);

hive> insert into table products select 1,’Watches’,array(‘Red’,’Green’) from dummy;
hive> insert into table products select 2,’Clothes’,array(‘Blue’,’Green’) from dummy;
hive> insert into table products select 3,’Books’,array(‘Blue’,’Green’,’Red’) from dummy;

Now we have 3 entries in products table as seen by select query below;
hive> select * from products;
1 Watches [“Red”,”Green”]
2 Clothes [“Blue”,”Green”]
3 Books [“Blue”,”Green”,”Red”]

Below is select with first index of array:
hive> SELECT id, productname, productcoloroptions[0] FROM default.products;
1 Watches Red
2 Clothes Blue
3 Books Blue

Now the main command LATERAL VIEW EXPLODE which gives below result.
SELECT p.id,p.productname,colors.colorselection FROM default.products P
LATERAL VIEW EXPLODE(p.productcoloroptions) colors as colorselection;
1 Watches Red
1 Watches Green
2 Clothes Blue
2 Clothes Green
3 Books Blue
3 Books Green
3 Books Red

A lateral view first applies the UDTF to each row of base table and then joins resulting output rows to the input rows to form a virtual table having the supplied table alias.

 

Gathering Twitter Data using Flume

Getting Streaming data from twitter using Flume & then putting it in Hive to do analysis.

1. We are using cloudera VM so it is assumed that Java, Hadoop, Hive, Flume etc. is already installed. Things are up & running.

2. Also steps to create Twitter Project & getting consumerKey, consumerSecret, accessToken & accessTokenSecret is not covered. It is available in many blogs.

3. Configuration File which is used in Flume:
[cloudera@quickstart conf]$ pwd
/usr/lib/flume-ng/conf
[cloudera@quickstart conf]$ cat twitter.conf
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel

TwitterAgent.sources.Twitter.consumerKey = ***
TwitterAgent.sources.Twitter.consumerSecret = *****

TwitterAgent.sources.Twitter.accessToken = ****
TwitterAgent.sources.Twitter.accessTokenSecret = ****

TwitterAgent.sources.Twitter.maxBatchSize = 50000
TwitterAgent.sources.Twitter.maxBatchDurationMillis = 100000

TwitterAgent.sources.Twitter.keywords = @narendramodi

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://quickstart.cloudera/user/cloudera/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

# Describing/Configuring the channel TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 1000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

4. Then we have to run this config file using following command:
./flume-ng agent -n TwitterAgent -c conf -f ../conf/twitter.conf
at location: /usr/lib/apache-flume-1.6.0-bin/bin

5. As per our configuration file sink output path we can see files getting generated at location:
hdfs://quickstart.cloudera/user/cloudera/twitter_data/*
By default these files are in avro file format, so we will use avro-tools-1.7.7.jar to generate schema which will be used to build hive table.
run command as:
C:\Users\ankit.baldua.AVULZE0\Desktop\flume>java -jar avro-tools-1.7.7.jar getschema FlumeData.1476876054060.avro > Flumedata.avsc

6. Connect to hive shell & create table as below, put above generated avsc file in correct path.

CREATE EXTERNAL TABLE tweetsavro
ROW FORMAT SERDE
‘org.apache.hadoop.hive.serde2.avro.AvroSerDe’
STORED AS INPUTFORMAT
‘org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat’
OUTPUTFORMAT
‘org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat’
TBLPROPERTIES (‘avro.schema.url’=’hdfs://quickstart.cloudera/user/cloudera/FlumeData.avsc’) ;

7. To put data in above created table:
LOAD DATA INPATH ‘/user/cloudera/twitter_data/*’ OVERWRITE INTO TABLE tweetsavro;

hive> describe tweetsavro;
OK
id string from deserializer
user_friends_count int from deserializer
user_location string from deserializer
user_description string from deserializer
user_statuses_count int from deserializer
user_followers_count int from deserializer
user_name string from deserializer
user_screen_name string from deserializer
created_at string from deserializer
text string from deserializer
retweet_count bigint from deserializer
retweeted boolean from deserializer
in_reply_to_user_id bigint from deserializer
source string from deserializer
in_reply_to_status_id bigint from deserializer
media_url_https string from deserializer
expanded_url string from deserializer

This means that schema is automatically created from avsc file. We can now load data.

8. Then this table is loaded with data & we can run select commands as usual.
hive> select * from tweetsavro limit 1;
OK
790880562726612992 97 NULL Kirsten Danielle Tan Delavin
Madeleine Yrenea Madayag #17
Hope Elizabeth Soberano 361 27 Sheina Delavin sheina_bastes 2016-10-25T04:39:54Z Balita hahahaha cute ni Dundun 0 false -1 Twitter for Android -1 NULL NULL
Time taken: 0.859 seconds, Fetched: 1 row(s)
hive>

9. In this way we can successfully load streaming twitter data into Hive.

Parsing XML in PIG

There are several ways for converting XML file to desired format using PIG, such as writing a regular expression (Regex)  or writing a custom java code. But in piggybank.jar from 0.13 version on-wards we can achieve it directly, PiggyBank is User-defined functions (UDFs) contributed by Pig users! below is the process to do so:

After entering in grunt shell run following commands:

1. REGISTER piggybank.jar

grunt> REGISTER ‘/usr/lib/pig/piggybank-0.13.0.jar’;

Make sure jar is piggybank-0.13.0.jar or above version, because in older version Xpath class in org.apache.pig.piggybank.evaluation.xml was not present.

2. DEFINE XPath org.apache.pig.piggybank.evaluation.xml.XPath();

Define XPath as Method name so that it can be easily used

Input File:

Java Guide
AUS
DATACENTER
2500
2016

Cricket CookBook
Ankit Baldua
INDIA
ABC Pvt. LTd.
2000
2016

Java CookBook
Sachin Tendulkar
INDIA
Ankit Pvt. LTd.
3000
2016
3. sample_input = load ‘/home/cloudera/Desktop/sample.xml’ using org.apache.pig.piggybank.storage.XMLLoader(‘BOOK’) as (x:chararray);

We have taken Bag with sample_input name & loading sample.xml file using Piggy Bank XML Loader Function.

4. desired_output = FOREACH sample_input GENERATE XPath(x, ‘BOOK/AUTHOR’), XPath(x, ‘BOOK/PRICE’), XPath(x, ‘BOOK/COUNTRY’);

In another Bag named desired_output we are using FOREACH to iterate through xml & then Xpath function to get relevant result parsing xml tags

5. DUMP desired_output;

finally printing desired output using dump.

Output:
(,2500,AUS)  //if you see input file Author tag missing in first entry of xml so author blank in o/p, but price & country are printed)
(Ankit Baldua,2000,INDIA)
(Sachin Tendulkar,3000,INDIA)

For more usage of XML with PIG:

http://help.mortardata.com/technologies/pig/xml

 

HBase shell commands

Great Compilation of HBASE Shell Commands

Learn HBase

As told in HBase introduction, HBase provides Extensible jruby-based (JIRB) shell as a feature to execute some commands(each command represents one functionality).

HBase shell commands are mainly categorized into 6 parts

1) General  HBase shell commands

status Show cluster status. Can be ‘summary’, ‘simple’, or ‘detailed’. The
default is ‘summary’.

hbase> status
hbase> status ‘simple’
hbase> status ‘summary’
hbase> status ‘detailed’

version Output this HBase versionUsage:

hbase> version

whoami Show the current hbase user.Usage:

hbase> whoami

2) Tables Management commands

alter Alter column family schema; pass table name and a dictionary
specifying new column family schema. Dictionaries are described
on the main help command output. Dictionary must include name
of column family to alter.For example, to change or add the ‘f1’ column family in table ‘t1’ from
current value to keep a maximum of 5 cell VERSIONS, do:

hbase> alter ‘t1’, NAME => ‘f1’, VERSIONS => 5

You…

View original post 2,836 more words

Apache PIG

For Processing huge data, both structured & unstructured & to do data summarization, querying & advanced querying.
Since PIG is a framework, lot of predefined actions & transformation are already provided to do interaction with pig shell. (actions & transformation are nothing but kind off commands to interact with pig’s grunt shell.)
Whenever we are executing PIG actions, internally MapReduce job gets triggered by framework.

Challenges with MapReduce or need of PIG:
1. Programming Skills required to code Map Reduce (Java, Python, Scala etc.)
2. Long deployment process
3. No. of lines of code is too much

All the challenges with MapReduce are converted into advantage in pig, as in pig not much programming skills required unless going for UDF, no long deployment process.

Installing PIG:
To be installed on top of Hadoop (HDFS & Map Reduce)
Download Pig related tarball from internet, install it on top of hadoop & set up class path.

Modes of Operation:
1. Local Mode : Input data will be taken from Local file system (LFS) path & output will also be part of LFS. Means in local mode, HDFS is not into picture, so framework will copy data into temporary location of HDFS, process it using MapReduce & copy processed data back to LFS.
2. HDFS Mode: Input & Output both in HDFS.
3. Embedded Mode: If we are not able to achieve desired functionality using existing actions, we can choose embedded mode to develop customized applications. This is nothing but UDFs.

Default Transformations or operators in PIG:

Loading & Storing:

1. LOAD : To Load the data from the file system (local/HDFS) into a relation.
2. STORE : To save a relation to the file system (local/HDFS).

Filtering:

3. FOREACH : To generate data transformations based on columns of data.
4. GENERATE : To generate data transformations based on columns of data.
5. FILTER : To remove unwanted rows from a relation.
6. DISTINCT : To remove duplicate rows from a relation.
7. STREAM: To transform a relation using an external program.

Grouping & Joining:

8. JOIN : To join two or more relations.
9. COGROUP : To group the data in two or more relations.
10. GROUP : To group the data in a single relation.
11. CROSS : To create the cross product of two or more relations.

Sorting:

12. ORDER : To arrange a relation in a sorted order based on one or more fields (ascending or descending).
13. LIMIT : To get a limited number of tuples from a relation.

Combining & Splitting:

14. UNION : To combine two or more relations into a single relation.
15. SPLIT : To split a single relation into two or more relations.

Diagnostic Operators:

16. DUMP : To print the contents of a relation on the console.
17. DESCRIBE : To describe the schema of a relation.
18. EXPLAIN : To view the logical, physical, or MapReduce execution plans to compute a relation.
19. ILLUSTRATE : To view the step-by-step execution of a series of statements.

Pig Language Data Types:

As in normal languages data types such as int, float, long, double, boolean are there. String is replaced by chararray. If no datatype is provided default is bytearray. Internally data gets stored in form of bytearray in pig.

Atom – Any single value in Pig Latin, irrespective of their data, type is known as an Atom. It is stored as string and can be used as string and number. int, long, float, double, chararray, and bytearray are the atomic values of Pig. A piece of data or a simple atomic value is known as a field

Tuple – A record that is formed by an ordered set of fields is known as a tuple, the fields can be of any type. A tuple is similar to a row in a table of RDBMS.
Example: (Ankit, 28)

Bag – A bag is an un-ordered set of tuples. In other words, a collection of tuples (non-unique) is known as a bag. Each tuple can have any number of fields (flexible schema). A bag is represented by {} . Example: {(Ankit, 28), (Rahul, 30)}

Different flavors of PIG execution:

1. grunt shell: this is default mode of pig execution, whether o/p is success or fail will be checked in shell itself.
syntax:
local mode: pig -x local
grunt>
hdfs mode: Pig or pig -x mapreduce
grunt>

2. script mode: instead of writing command one by one in grunt shell, put all commands in one file & save it with any extension though .pig is preferred.
to run script, syntax:
local mode: pig -x local <script-name>
hdfs mode: pig <script-name>

How to achieve embedded mode (User Defined Functions):
1. Develop a class that will extend from a base class of EvalFunc<>
2. In order to write business logic, we need to override a method called exec()
3. add required jar files for program compilation purpose & generate our own jar file.
4. Register our generated jar with Pig, using syntax: REGISTER <<jarname>>
5. Then this function can be used.

Note: While calling UDF function it is mandatory to register the jar every time a new shell is opened.
Also if package name is there then full qualified path name till the package is required while using the function.

Happy Reading!

 

Logstash

Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice. Cleanse and democratize all your data for diverse advanced downstream analytics and visualization use cases.

While Logstash originally drove innovation in log collection, its capabilities extend well beyond that use case. Any type of event can be enriched and transformed with a broad array of input, filter, and output plugins, with many native codecs further simplifying the ingestion process. Logstash accelerates your insights by harnessing a greater volume and variety of data.

Config file is most important & contains following major categories:

Input plugins:
An input plugin enables a specific source of events to be read by Logstash.
https://www.elastic.co/guide/en/logstash/current/input-plugins.html#input-plugins

Output plugins:
An output plugin sends event data to a particular destination. Outputs are the final stage in the event pipeline.
https://www.elastic.co/guide/en/logstash/current/output-plugins.html

Filter plugins:
A filter plugin performs intermediary processing on an event. Filters are often applied conditionally depending on the characteristics of the event.
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

Codec plugins:
A codec plugin changes the data representation of an event. Codecs are essentially stream filters that can operate as part of an input or output.
https://www.elastic.co/guide/en/logstash/current/codec-plugins.html#codec-plugins

How to run config file:
java -jar logstash-1.2.2-flatjar.jar agent -f logstash_sample.conf — For windows (from location where logstash/bin is installed)

/location to logstash bin/java -jar logstash-1.2.2-flatjar.jar agent -f logstash_sample.conf — For Linux

Analytics ‘On The Edge’

The idea of analytics at the edge is about the notion of doing the right amount of processing of data at the right place.

“If you’re generating tons of data in your data center, then analyze it in your data center.

“If you’re generating tons of data at the edge, and you have unlimited bandwidth, then you can still send it all back to the data center – its fine.

“But there are environments where that is not possible, and the two main things that come together to make it not possible are the volume of data and the perishable nature of data.”

The concept of “edge analytics” is gaining popularity.  Sometimes known as distributed analytics, it basically means designing systems where analytics is performed at the point where (or very close to where) the data is collected. Often, this is where action based on the insights provided by the data is most needed. Rather than designing centralized systems where all the data is sent back to your data warehouse in a raw state, where it has to be cleaned and analyzed before being of any value, why not do everything at the “edge” of the system?

How does distributed IoT analytics work?

The hierarchy begins with “simple” analytics on the smart device itself, more complex multi-device analytics on the IoT gateways, and finally the heavy lifting— the big data analytics — running in the cloud. This distribution of analytics offloads the network and the data centers by creating a model that scales. Distributing the analytics to the edge is the only way to progress.
Edge IoT analytics is more than just about operational efficiencies and scalability. Many business processes do not require “heavy duty” analytics and therefore the data collected, processed and analyzed on or near the edge can drive automated decisions. For example, a local valve can be turned off when a leak is detected.

If latency is a concern for some businesses, then actions can be taken in real time to avoid delays between the sensor-registered event and the reaction to that event. This is extremely true of industrial control systems when sometimes there is no time to transmit the data to a remote cloud. Issues such as this can be remedied with a distributed analytics model.

In some sense, cities are already dealing with these types of issues. Sensors such as CCTV units and speed cameras are already deployed on our highway infrastructure, and as these become smarter, the volume of data that can be collected increases and becomes more valuable to governments and city councils looking to make their transport systems more efficient.

Even when compressed, a typical video camera can produce a few megabits of data every second. Transporting these video streams requires bandwidth. Not only does bandwidth cost money but if in addition you want some quality of service, the whole thing becomes even more expensive. Thus, performing video analytics or even storage on the edge and transporting only the “results” is much cheaper.

For example, a system that takes in data from thousands of sensors needs to operate on a more instantaneous basis to reflect a changing highway. In this case, the machine is trained on what represents normal traffic flow and once the learning phase is over, the machine can autonomously indicate that something abnormal has happened. This is a more efficient method of detection because it is easier to spot something abnormal once you have learnt what is normal – after all, this is the way the human brain works.

Happy Reading!