Spark DF Subtraction

Below is a simple code which self sufficiently explains Data Frame Subtraction in Spark Scala:

Here we have created two data frames df1 & df2 & done operation on them:

val df1 = sc.parallelize(Seq((“a” -> 1), (“b” -> 2), (“c” -> 3), (“d” -> 4))).toDF(“c1”, “c2”)
val df2 = sc.parallelize(Seq((“a” -> 1), (“b” -> 1), (“e” -> 4))).toDF(“c1”, “c2”)

| c1| c2|
| a| 1|
| b| 2|
| c| 3|
| d| 4|
| c1| c2|
| a| 1|
| b| 1|
| e| 4|

| c1| c2|
| d| 4|
| c| 3|
| b| 2|

val df3=df1.drop(“c2”)
| c1|
| a|
| b|
| c|
| d|

val df4=df2.drop(“c2”)
| c1|
| a|
| b|
| e|


| c1|
| c|
| d|

Long = 2


To access individual elements of Array we use Built-in Table-Generating Functions (UDTF) available in Hive. Below explained is simple example with description in each step.

Built-in Table-Generating Functions (UDTF)

To understand details of any function we can run describe command, to get list of available functions we can run show functions;
hive> describe function explode;
explode(a) – separates the elements of array a into multiple rows, or the elements of a map into multiple rows and columns

Now we are creating table with name products, id of int type, product name of string type & ProductColorOptions of Array of String type.
hive> CREATE TABLE Products
> (id INT, ProductName STRING, ProductColorOptions ARRAY<STRING>);

We can not directly insert data in table containing arrays, we need to create dummy table & then use that table to insert data as shown below:
hive> create table dummy (a String);

hive> insert into table dummy values (‘a’);

hive> insert into table products select 1,’Watches’,array(‘Red’,’Green’) from dummy;
hive> insert into table products select 2,’Clothes’,array(‘Blue’,’Green’) from dummy;
hive> insert into table products select 3,’Books’,array(‘Blue’,’Green’,’Red’) from dummy;

Now we have 3 entries in products table as seen by select query below;
hive> select * from products;
1 Watches [“Red”,”Green”]
2 Clothes [“Blue”,”Green”]
3 Books [“Blue”,”Green”,”Red”]

Below is select with first index of array:
hive> SELECT id, productname, productcoloroptions[0] FROM default.products;
1 Watches Red
2 Clothes Blue
3 Books Blue

Now the main command LATERAL VIEW EXPLODE which gives below result.
SELECT,p.productname,colors.colorselection FROM default.products P
LATERAL VIEW EXPLODE(p.productcoloroptions) colors as colorselection;
1 Watches Red
1 Watches Green
2 Clothes Blue
2 Clothes Green
3 Books Blue
3 Books Green
3 Books Red

A lateral view first applies the UDTF to each row of base table and then joins resulting output rows to the input rows to form a virtual table having the supplied table alias.


Gathering Twitter Data using Flume

Getting Streaming data from twitter using Flume & then putting it in Hive to do analysis.

1. We are using cloudera VM so it is assumed that Java, Hadoop, Hive, Flume etc. is already installed. Things are up & running.

2. Also steps to create Twitter Project & getting consumerKey, consumerSecret, accessToken & accessTokenSecret is not covered. It is available in many blogs.

3. Configuration File which is used in Flume:
[cloudera@quickstart conf]$ pwd
[cloudera@quickstart conf]$ cat twitter.conf
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel

TwitterAgent.sources.Twitter.consumerKey = ***
TwitterAgent.sources.Twitter.consumerSecret = *****

TwitterAgent.sources.Twitter.accessToken = ****
TwitterAgent.sources.Twitter.accessTokenSecret = ****

TwitterAgent.sources.Twitter.maxBatchSize = 50000
TwitterAgent.sources.Twitter.maxBatchDurationMillis = 100000

TwitterAgent.sources.Twitter.keywords = @narendramodi

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://quickstart.cloudera/user/cloudera/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

# Describing/Configuring the channel TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 1000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel = MemChannel

4. Then we have to run this config file using following command:
./flume-ng agent -n TwitterAgent -c conf -f ../conf/twitter.conf
at location: /usr/lib/apache-flume-1.6.0-bin/bin

5. As per our configuration file sink output path we can see files getting generated at location:
By default these files are in avro file format, so we will use avro-tools-1.7.7.jar to generate schema which will be used to build hive table.
run command as:
C:\Users\ankit.baldua.AVULZE0\Desktop\flume>java -jar avro-tools-1.7.7.jar getschema FlumeData.1476876054060.avro > Flumedata.avsc

6. Connect to hive shell & create table as below, put above generated avsc file in correct path.

TBLPROPERTIES (‘avro.schema.url’=’hdfs://quickstart.cloudera/user/cloudera/FlumeData.avsc’) ;

7. To put data in above created table:
LOAD DATA INPATH ‘/user/cloudera/twitter_data/*’ OVERWRITE INTO TABLE tweetsavro;

hive> describe tweetsavro;
id string from deserializer
user_friends_count int from deserializer
user_location string from deserializer
user_description string from deserializer
user_statuses_count int from deserializer
user_followers_count int from deserializer
user_name string from deserializer
user_screen_name string from deserializer
created_at string from deserializer
text string from deserializer
retweet_count bigint from deserializer
retweeted boolean from deserializer
in_reply_to_user_id bigint from deserializer
source string from deserializer
in_reply_to_status_id bigint from deserializer
media_url_https string from deserializer
expanded_url string from deserializer

This means that schema is automatically created from avsc file. We can now load data.

8. Then this table is loaded with data & we can run select commands as usual.
hive> select * from tweetsavro limit 1;
790880562726612992 97 NULL Kirsten Danielle Tan Delavin
Madeleine Yrenea Madayag #17
Hope Elizabeth Soberano 361 27 Sheina Delavin sheina_bastes 2016-10-25T04:39:54Z Balita hahahaha cute ni Dundun 0 false -1 Twitter for Android -1 NULL NULL
Time taken: 0.859 seconds, Fetched: 1 row(s)

9. In this way we can successfully load streaming twitter data into Hive.

Parsing XML in PIG

There are several ways for converting XML file to desired format using PIG, such as writing a regular expression (Regex)  or writing a custom java code. But in piggybank.jar from 0.13 version on-wards we can achieve it directly, PiggyBank is User-defined functions (UDFs) contributed by Pig users! below is the process to do so:

After entering in grunt shell run following commands:

1. REGISTER piggybank.jar

grunt> REGISTER ‘/usr/lib/pig/piggybank-0.13.0.jar’;

Make sure jar is piggybank-0.13.0.jar or above version, because in older version Xpath class in org.apache.pig.piggybank.evaluation.xml was not present.

2. DEFINE XPath org.apache.pig.piggybank.evaluation.xml.XPath();

Define XPath as Method name so that it can be easily used

Input File:

Java Guide

Cricket CookBook
Ankit Baldua
ABC Pvt. LTd.

Java CookBook
Sachin Tendulkar
Ankit Pvt. LTd.
3. sample_input = load ‘/home/cloudera/Desktop/sample.xml’ using‘BOOK’) as (x:chararray);

We have taken Bag with sample_input name & loading sample.xml file using Piggy Bank XML Loader Function.

4. desired_output = FOREACH sample_input GENERATE XPath(x, ‘BOOK/AUTHOR’), XPath(x, ‘BOOK/PRICE’), XPath(x, ‘BOOK/COUNTRY’);

In another Bag named desired_output we are using FOREACH to iterate through xml & then Xpath function to get relevant result parsing xml tags

5. DUMP desired_output;

finally printing desired output using dump.

(,2500,AUS)  //if you see input file Author tag missing in first entry of xml so author blank in o/p, but price & country are printed)
(Ankit Baldua,2000,INDIA)
(Sachin Tendulkar,3000,INDIA)

For more usage of XML with PIG:


HBase shell commands

Great Compilation of HBASE Shell Commands

Learn HBase

As told in HBase introduction, HBase provides Extensible jruby-based (JIRB) shell as a feature to execute some commands(each command represents one functionality).

HBase shell commands are mainly categorized into 6 parts

1) General  HBase shell commands

status Show cluster status. Can be ‘summary’, ‘simple’, or ‘detailed’. The
default is ‘summary’.

hbase> status
hbase> status ‘simple’
hbase> status ‘summary’
hbase> status ‘detailed’

version Output this HBase versionUsage:

hbase> version

whoami Show the current hbase user.Usage:

hbase> whoami

2) Tables Management commands

alter Alter column family schema; pass table name and a dictionary
specifying new column family schema. Dictionaries are described
on the main help command output. Dictionary must include name
of column family to alter.For example, to change or add the ‘f1’ column family in table ‘t1’ from
current value to keep a maximum of 5 cell VERSIONS, do:

hbase> alter ‘t1’, NAME => ‘f1’, VERSIONS => 5


View original post 2,836 more words

Apache PIG

For Processing huge data, both structured & unstructured & to do data summarization, querying & advanced querying.
Since PIG is a framework, lot of predefined actions & transformation are already provided to do interaction with pig shell. (actions & transformation are nothing but kind off commands to interact with pig’s grunt shell.)
Whenever we are executing PIG actions, internally MapReduce job gets triggered by framework.

Challenges with MapReduce or need of PIG:
1. Programming Skills required to code Map Reduce (Java, Python, Scala etc.)
2. Long deployment process
3. No. of lines of code is too much

All the challenges with MapReduce are converted into advantage in pig, as in pig not much programming skills required unless going for UDF, no long deployment process.

Installing PIG:
To be installed on top of Hadoop (HDFS & Map Reduce)
Download Pig related tarball from internet, install it on top of hadoop & set up class path.

Modes of Operation:
1. Local Mode : Input data will be taken from Local file system (LFS) path & output will also be part of LFS. Means in local mode, HDFS is not into picture, so framework will copy data into temporary location of HDFS, process it using MapReduce & copy processed data back to LFS.
2. HDFS Mode: Input & Output both in HDFS.
3. Embedded Mode: If we are not able to achieve desired functionality using existing actions, we can choose embedded mode to develop customized applications. This is nothing but UDFs.

Default Transformations or operators in PIG:

Loading & Storing:

1. LOAD : To Load the data from the file system (local/HDFS) into a relation.
2. STORE : To save a relation to the file system (local/HDFS).


3. FOREACH : To generate data transformations based on columns of data.
4. GENERATE : To generate data transformations based on columns of data.
5. FILTER : To remove unwanted rows from a relation.
6. DISTINCT : To remove duplicate rows from a relation.
7. STREAM: To transform a relation using an external program.

Grouping & Joining:

8. JOIN : To join two or more relations.
9. COGROUP : To group the data in two or more relations.
10. GROUP : To group the data in a single relation.
11. CROSS : To create the cross product of two or more relations.


12. ORDER : To arrange a relation in a sorted order based on one or more fields (ascending or descending).
13. LIMIT : To get a limited number of tuples from a relation.

Combining & Splitting:

14. UNION : To combine two or more relations into a single relation.
15. SPLIT : To split a single relation into two or more relations.

Diagnostic Operators:

16. DUMP : To print the contents of a relation on the console.
17. DESCRIBE : To describe the schema of a relation.
18. EXPLAIN : To view the logical, physical, or MapReduce execution plans to compute a relation.
19. ILLUSTRATE : To view the step-by-step execution of a series of statements.

Pig Language Data Types:

As in normal languages data types such as int, float, long, double, boolean are there. String is replaced by chararray. If no datatype is provided default is bytearray. Internally data gets stored in form of bytearray in pig.

Atom – Any single value in Pig Latin, irrespective of their data, type is known as an Atom. It is stored as string and can be used as string and number. int, long, float, double, chararray, and bytearray are the atomic values of Pig. A piece of data or a simple atomic value is known as a field

Tuple – A record that is formed by an ordered set of fields is known as a tuple, the fields can be of any type. A tuple is similar to a row in a table of RDBMS.
Example: (Ankit, 28)

Bag – A bag is an un-ordered set of tuples. In other words, a collection of tuples (non-unique) is known as a bag. Each tuple can have any number of fields (flexible schema). A bag is represented by {} . Example: {(Ankit, 28), (Rahul, 30)}

Different flavors of PIG execution:

1. grunt shell: this is default mode of pig execution, whether o/p is success or fail will be checked in shell itself.
local mode: pig -x local
hdfs mode: Pig or pig -x mapreduce

2. script mode: instead of writing command one by one in grunt shell, put all commands in one file & save it with any extension though .pig is preferred.
to run script, syntax:
local mode: pig -x local <script-name>
hdfs mode: pig <script-name>

How to achieve embedded mode (User Defined Functions):
1. Develop a class that will extend from a base class of EvalFunc<>
2. In order to write business logic, we need to override a method called exec()
3. add required jar files for program compilation purpose & generate our own jar file.
4. Register our generated jar with Pig, using syntax: REGISTER <<jarname>>
5. Then this function can be used.

Note: While calling UDF function it is mandatory to register the jar every time a new shell is opened.
Also if package name is there then full qualified path name till the package is required while using the function.

Happy Reading!



Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice. Cleanse and democratize all your data for diverse advanced downstream analytics and visualization use cases.

While Logstash originally drove innovation in log collection, its capabilities extend well beyond that use case. Any type of event can be enriched and transformed with a broad array of input, filter, and output plugins, with many native codecs further simplifying the ingestion process. Logstash accelerates your insights by harnessing a greater volume and variety of data.

Config file is most important & contains following major categories:

Input plugins:
An input plugin enables a specific source of events to be read by Logstash.

Output plugins:
An output plugin sends event data to a particular destination. Outputs are the final stage in the event pipeline.

Filter plugins:
A filter plugin performs intermediary processing on an event. Filters are often applied conditionally depending on the characteristics of the event.

Codec plugins:
A codec plugin changes the data representation of an event. Codecs are essentially stream filters that can operate as part of an input or output.

How to run config file:
java -jar logstash-1.2.2-flatjar.jar agent -f logstash_sample.conf — For windows (from location where logstash/bin is installed)

/location to logstash bin/java -jar logstash-1.2.2-flatjar.jar agent -f logstash_sample.conf — For Linux