Thursday, September 24, 2015

How to work with Avro data using Apache Spark(Spark SQL API)

We all know how cool Spark is when it comes to fast, general-purpose cluster computing. Apart from the core APIs Spark also provides a rich set of higher-level Apis which include Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for high-throughput, fault-tolerant stream processing of live data streams.

Through this post we'll explore the Spark SQL API and see how to use it with Avro data. As stated earlier, Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine. A DataFrame is a distributed collection of data organised into named columns. We can think of a DataFrame as a table in a relational database or as a dataframe in R or Python.

Apache Avro is a very popular data serialization system, specially in BigData world. We'll use the spark-avro library for this. spark-avro is a beautiful library provided by Databricks, which helps us in reading and writing Avro data.

We begin with creating an instance of JavaSparkContext using SparkConf.

SparkConf conf = new SparkConf().setAppName("SQLApiDemo").setMaster(

JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

Once created, this JavaSparkContext instance is used to create an instance of SQLContext, which will be used to read, operate on and write the Avro data.

SQLContext sqlContext = new SQLContext(sc);

Next, we use the load() API provided by SQLContext to read some Avro data in from a given source. The load() API returns a DataFrame created out of the data read from a specified source. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs. Another good thing about DataFrame is that the DataFrame API is available in Scala, Java, Python, and R.

Here is a complete example showing how to read and work with Avro data using DataFrame :

import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class SQLApiDemo {

    public static void main(String[] args) throws IOException {

        SparkConf conf = new SparkConf().setAppName("SQLApiDemo").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame df = sqlContext.load("/Users/tariq/avro_data/browser.avro/", "com.databricks.spark.avro");

Not just this, DataFrame API also allows us to perform various structured data processing operations. For example :"name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1"name", df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("name") > 21).show();
// age name
// 30  Andy

// Count people by age
// age  count
// null 1
// 19   1
// 30   1

Wednesday, February 19, 2014

Fun with HBase shell

HBase shell is great, specially while getting yourself familiar with HBase. It provides lots of useful shell commands using which you can perform trivial tasks like creating tables, putting some test data into it, scanning the whole table, fetching data from a specific row etc etc. Executing help on HBase shell will give you the list of all the HBase shell commands. If you need help on a specific command, type help "command". For example, help "get" will give you a detailed explanation of the get command.

But this post is not about the above said stuff. We will try to do something fun here. Something which is available, but less known. So, get ready, start your HBase daemons, open HBase shell and get your hands dirty.

For those of us who are unaware, HBase shell is based on JRuby, the Java Virtual Machine-based implementation of Ruby. More specifically, it uses the Interactive Ruby Shell (IRB), which is used to enter Ruby commands and get an immediate response. HBase ships with Ruby scripts that extend the IRB with specific commands, related to the Java-based APIs. It inherits the built-in support for command history and completion, as well as all Ruby commands.

We will start with something that is my favorite, which is having shell commands that provide jruby-style object-oriented references for tables. What does that mean?? Previously all of the HBase shell commands that act upon a table have a procedural style that always took the name of the table as an argument. But now it is possible to assign a table to a jruby variable. So no more unnecessary typing of table names.

The table reference can then be used to perform data read write operations such as puts, scans, and gets along with admin functionality such as disabling, dropping, describing tables.

For example, previously we would always have to specify a table name while performing some operations, like get, scan, disable etc :

hbase(main):000:0> create 'demo', 'cf'
0 row(s) in 1.0970 seconds

hbase(main):001:0> put 'demo', row1', 'cf:c1', 'val1'
0 row(s) in 0.0080 seconds

hbase(main):002:0> scan 'demo' 
ROW                                COLUMN+CELL
 row1                              column=cf:c1, timestamp=1378473207660, value=val1                                                      
1 row(s) in 0.0130 seconds

But now you can assign the table to a variable and use the results in jruby shell code :

hbase(main):007 > demo = create 'demo', 'cf'
0 row(s) in 1.0970 seconds

=> Hbase::Table - demo
hbase(main):008 > demo.put 'row1', 'cf:c1', 'val1'
0 row(s) in 0.0640 seconds

hbase(main):009 > demo.scan
ROW                           COLUMN+CELL                                                                        
 row1                            column=cf:c1, timestamp=1331865816290, value=val1                                        
1 row(s) in 0.0110 seconds

You can even assign a table to a variable by using the get_table method :

hbase(main):012:0> demo = get_table 'demo'
0 row(s) in 0.0010 seconds

=> Hbase::Table - demo
hbase(main):013:0> demo.put ‘row1’ ,’cf:c1’, ‘val1’ 
0 row(s) in 0.0100 seconds
hbase(main):014:0> demo.scan
ROW                                COLUMN+CELL                                                                                      
 row1                                column=cf:c1, timestamp=1378473876949, value=val1
1 row(s) in 0.0240 seconds

Isn't it handy?

NOTE : You need HBase 0.95 for this

Moving further, have you ever felt how cool it would be to have the ability to clear HBase shell? Quite often you would find HBase shell completely filled with results of previously executed queries. But we don't have a clear command like our OS to clear the shell and make it cleaner so that we can concentrate on the result of next query properly. To overcome this problem we can again take advantage of the fact that HBase shell is based on JRuby. All we have to do is create a .irbrc file with the desired customization logic. To do this we just have to create a file named .irbrc in our home directory and add the desired customization code in it.

For our clear screen example, we could do this :

vi ~/.irbrc

#Clear HBase shell
def cls

Kernel.at_exit do
  IRB.conf[:AT_EXIT].each do |i|


Save the file and open HBase shell. Execute cls and if everything goes fine you will find your shell all clear. Another trick could be to have the history command enabled for HBase shell so that you just use the up arrow key to select a previously executed command. HBase by default maintains command history for a particular session. Once you come out of the shell the history is gone. But using the below shown piece of code you can use the history feature even if you restart the HBase shell. To do that reopen your ~/.irbrc file and append the below shown code in it. So, your ~/.irbrc will look like this :

vi ~/.irbrc

#Clear HBase shell
def cls

#Enable history
require "irb/ext/save-history"
#No. of commands to be saved. 100 here
IRB.conf[:SAVE_HISTORY] = 100
# The location to save the history file
IRB.conf[:HISTORY_FILE] = "#{ENV['HOME']}/.irb-save-history"

Kernel.at_exit do
  IRB.conf[:AT_EXIT].each do |i|


Save the file and exit. To ross check, open HBase shell and start pressing the up arrow key. You should be able to see the commands executed in previous sessions.

Another good feature to have could be to have the ability to list HDFS dires/files from HBase shell like we can do from Pig's grunt shell or Hive shell. You will have to add these lines in your ~/.irbrc file for that :

vi ~/.irbrc

#Clear HBase shell
def cls

#Enable history
require "irb/ext/save-history"
#No. of commands to be saved. 100 here
IRB.conf[:SAVE_HISTORY] = 100
# The location to save the history file
IRB.conf[:HISTORY_FILE] = "#{ENV['HOME']}/.irb-save-history"

#List given HDFS path
def ls(path)
  system("$HADOOP_HOME/bin/hadoop fs -ls #{directory}")

Kernel.at_exit do
  IRB.conf[:AT_EXIT].each do |i|


Save the files and exit. Open your HBase shell and type :

hbase(main):012:0> ls ('directory_name')

This will list down all the directories and files present inside the directory called directory_name.

NOTE : Please mind the quotes(' ') in the above shown command.

Another shell feature which I really like is the ability to use HBase Filters. For example, if I want to get all the rows from a table called users where value of the column called name is abc, I can do this :

hbase(main):001:0> import org.apache.hadoop.hbase.util.Bytes

hbase(main):002:0> import org.apache.hadoop.hbase.filter.SingleColumnValueFilter

hbase(main):003:0> import org.apache.hadoop.hbase.filter.BinaryComparator

hbase(main):004:0> import org.apache.hadoop.hbase.filter.CompareFilter

hbase(main):005:0> scan 'users', { FILTER =>'cf'), Bytes.toBytes('name'), CompareFilter::CompareOp.valueOf('EQUAL'),'abc')))}

This comes in pretty handy when you want to perform some quick checks on your data.

That was it for today. I will try to cover few more things some other day. As always, your comments and suggestions are welcome. Do let me know if there is any scope to make the post better in any manner.

Friday, January 10, 2014

List of my top 10 most voted SO answers

Here is a list of my top 10 most voted answers on Stackoverflow. All these questions are related to cloud computing including discussions on distributed storage and computing tools like Hadoop, HBase etc. I hope you find it useful as others did.

Analyzing your data on the fly with Pig through Mortar Watchtower

Let me start by thanking Mortar for developing such an amazing tool.  Isn't it really cool to have the ability to make your Pig development faster without having to write a complete script, run it and then wait for for local or remote Pig to finish the execution and finally give you the final data? Quite often, when writing a Pig script, I find it very time consuming to debug what each line of my script is doing. Moreover, the fact that Pig is a dataflow language makes it even more important to have a clear idea of what exactly your data looks like at each step of the flow. This obviously helps in writing compact and efficient scripts. Trust me, you don't want to write inefficient code while dealing with Petabytes of data.

It's a bitter truth that Hadoop development iterations are slow. Traditional programmers have always had the benefit of re-compiling their app, running it, and seeing the results within seconds. They have near instant validation that what they’re building is actually working. When you’re working with Hadoop, dealing with Petabytes of data, your development iteration time is more like hours(even days sometimes). With watchtower folks at Mortar have made an awesome effort to bring back that almost instant iteration cycle developers are used to. Not only that, Watchtower also helps surface the semantics of your Pig scripts, to give you insight into how your scripts are working, not just that they are working.

What is Watchtower??

Watchtower is basically a daemon which continuously watches your data and script running over it in real time. It stores the state of your data at each step and shows how it changes at each step as your script goes. It actually shows the exact flow of your data, directly inline with your script. Not only this Watchtower helps us in finding out the errors in our script as we proceed. So, you don't have to wait until the completion and execution of your script. Since Watchtower is constantly sending data through your entire script, errors are surfaced and displayed instantly.

This is what Watchtower provides you with (courtesy Watchtower homepage) :

  • Instant Sampling of Your Data: Watchtower samples your data in the background while writing your script. This means that when you start writing code, Watchtower is able to provide instant and accurate examples of your data flowing through your script.
  • Complete File Watching: Watchtower watches all files in your Mortar Project for changes. If watchtower detects a change in any of your scripts, UDFs, or even your data, it will recalculate the samples instantly and show you what changed.
  • Instant Schema Evaluation: Watchtower re-evaluates your schema on file save, not only verifying that you referred to the implied schema correctly, but also to show how Pig builds up the schema and generates field names. This is incredibly powerful for the novice (or experienced!) Pig developer who doesn't full understand how Pig uses features like the disambiguate operator.
  • Instant Error Catching: Since Watchtower is running data through your entire script, errors in your script and UDFs are surfaced immediately. Allowing you to debug and fix the errors before you ship your job to an Hadoop cluster.
To get started with Watchtower visit its installation page. It contains all the info you need to get started with Watchtower and use it.

This page contains a detailed description of how Watchtower works, along with a short introductory video.

Wednesday, September 11, 2013

How to run Hive queries through Hive Web Interface.

One of the good things about Hadoop, and related projects, which I really like is the WebUI provided to us. It makes our life a lot easier. Just point your web browser to the appropriate URL and quickly perform the desired action. Be it browsing through HDFS files or glancing over HBase tables. Otherwise you need to go the shell and issue the associated commands one by one for each action [I know i'm a bit lazy ;)].

Hive is no exception and provides us a WebUI, called as Hive Web Interface, or HWI in short. But, somehow I feel it is less documented and talked about as compared to HDFS and HBase WebUI. But that doesn't make it any less useful. In fact I personally find it quite helpful. With its help you can do various operations like browsing your DB schema, see your sessions, query your tables etc. You can also see the System and User variables like Java Runtime, your OS architecture, your PATH etc etc.

OK, enough brand building. Let's get started and see how to use HWI. The process is quite simple. First a couple of things on configuration. Following are the properties which you might have to modify as per your requirements :

  • : The host address the Hive Web Interface will listen on.
  • hive.hwi.listen.port : The port the Hive Web Interface will listen on.
  • hive.hwi.war.file : This is the WAR file with the jsp content for Hive Web Interface.

Values for these properties is totally your choice. I'll go ahead with the defaults.
You would probably want to setup HiveDerbyServerMode as well if you wish to allow multiple sessions at the same time.

Note : Make these changes in hive-site.xml file inside your $HIVE_HOME/conf/ directory. Create it if you don't have it already. Please don't change anything in default-site.xml file. This is important.

Now start HWI using the following command :
bin/hive --service hwi 

If everything goes fine you will see something like this on your terminal :
hive-0.10.0 miqbal1$ bin/hive --service hwi
13/09/11 00:21:46 INFO hwi.HWIServer: HWI is starting up
13/09/11 00:21:46 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
13/09/11 00:21:46 INFO mortbay.log: jetty-6.1.26
13/09/11 00:21:46 INFO mortbay.log: Extract /Users/miqbal1/hadoop-eco/hive-0.10.0/lib/hive-hwi-0.10.0.war to /var/folders/n3/d0ghj1ln2zl0kpd8zkz4zf04mdm1y2/T/Jetty_0_0_0_0_9999_hive.hwi.0.10.0.war__hwi__ae9cmk/webapp

13/09/11 00:21:46 INFO mortbay.log: Started SocketConnector@

You are good to go now. So point your web browser to HWI. For example, http://localhost:9999/hwi/index.jsp in my case, since i'm working on a local machine on my localhost with all default configuration parameters. Use the hostname and port as per your setup. This will take you to the HWI front page which will look like this :

You can click on Home if you wish to read about HWI a bit more, on Authorize to authorize a user. If you want to browse through your DB schema you can click on Browse Schema under DATABASE section. You can click on Diagnostics if you want to have a look at various System and User variables on your box. All this is merely a matter of one click. So we will move onto the main part, querying Hive tables. Follow the steps below in order to that :

  • Click on Create Session under SESSIONS section, enter some session name and hit Submit.

  • This will take you to the Manage Session screen. This is the place where all the action will take place. Come down to the Session Details section and enter a file name, say /Users/tariq/res.txt, in the Result File box. This is the file where the result of your query will get stored. If you expect your result to be very huge you can just enter /dev/null over there. Remember the result file is local to the web server. Similarly enter the error file if you wish.
  • Now come down to the Query box and write the query you want to execute.
  • Choose Yes or No for Silent Mode as per your wish. Select Yes for Start Query and hit Submit.

You should be able to see the file /Users/tariq/res.txt by now containing the result of your query. You can also view the result by clicking on View File option which will appear next to the Result File box upon the successful completion of your query.

That is it. Hope it helps. Do let me know in case of any issue.

Thursday, June 27, 2013

Visualizing Pig Queries Through Lipstick

Quite often while working with Pig you would have reached a situation wherein you found that your Pig scripts have reached such a level of complexity that the flow of execution, and it’s relation to the MapReduce jobs being executed, has become difficult to visualize. And this eventually ends up with the need of additional efforts required to develop, maintain, debug, and monitor the execution of scripts.

But not anymore. Thankfully Netflix has developed a tool that enables developers to visualize and monitor the execution of their data flows at a logical level, and they call it Lipstick. As an plementation of PigProgressNotificationListener, Lipstick piggybacks on top of all Pig scripts executed in our environment notifying a Lipstick server of job executions and periodically reporting progress as the script executes.

Lipstick has got some really cool features. For instance once you are at the Lipstick main page you can see all the Pig jobs that are currently running or have run. The following things are displayed for each job:
– User
– Job
– Start Time
– Heartbeat Time (last time a heartbeat was sent)
– Progress
             – Blue (running)
             – Green (complete)
             – Red (failed)
             – Orange (terminated)
  •  Clicking on the header (User, Job, Start Time, etc.) for a column will sort by the column (asc/desc).
  • Search by username or job name.
  • Filter jobs by progress.
  • Pagination controls (next page, show X jobs per page, etc).

Along with this there is a whole bunch of other cool stuff that Lipstick offers. You can find more on Lipstick user guide.

For a detailed overview you can visit their official blog section. And if you can't wait anymore and want to give it a try straight away, you can directly go to their repository.

Wednesday, May 1, 2013

How to install MapR M3 on Ubuntu through Ubuntu Partner Archive.

In a recent post of mine I had mentioned about the partnership between MapR and Canonical towards an initiative to make Hadoop available with Ubuntu natively through Ubuntu Partner Archive. Since, the package has been released now, I thought of showing how to get it done. Trust me it's really cool to install Hadoop by just one apt-get install :)

First things first. Open your sources.list file and add the MapR repositories into it.

deb mapr optional
deb binary/

Now, update your repository.
sudo apt-get update

Note : If it throws any error regarding MapR repositories, just uncomment the lines which allow us to add software from Canonical's partner repository.

## Uncomment the following two lines to add software from #Canonical's
## 'partner' repository.
## This software is not part of Ubuntu, but is offered by #Canonical and the
## respective vendors as a service to Ubuntu users.
deb precise partner 
deb-src precise partner

Install hadoop.
sudo apt-get install mapr-single-node

1, 2, 3..and you are done. Isn't that cool?Just three easy steps and you have your brand new single node hadoop cluster in your lap. But, there are some pre-requisites and it's very important to satisfy them.

CPU : 64-bit

OS : Red Hat, CentOS, SUSE, or Ubuntu

Memory : 4 GB minimum, more in production

Disk : Raw, unformatted drives and partitions

DNS : Hostname, reaches all other nodes

Users : Common users across all nodes; Keyless ssh

Java : Must run Java

Other : NTP, Syslog, PAM

The above procedure will install following services on your machine :

CLDB : mapr-cldb

JobTracker : mapr-jobtracker

MapR Control Server : mapr-webserver

MapR Data Platform : mapr-fileserver

Metrics : mapr-metrics

NFS : mapr-nfs

TaskTracker : mapr-tasktracker

ZooKeeper : mapr-zookeeper

In order to install other hadoop projects and for further documentation you can visit the official documentation here.

I hope you found this post helpful, and as always comments and suggestions are welcome.