• Call: +1 (858) 429-9131

Posts Tagged ‘AWS EC2’

Mapreduce using Hadoop + pig/hive on AWS EC2 hadoop cluster

This article discuss about running mapreduce jobs using the apache tools called pig and hive.Before we can process the data we need to upload the files to be processed to HDFS/S3.  We recommend uploading to hdfs and keeping the important files in s3 for backup is a better practice. s3 is easily accessible from commandline using tools like s3cmd. HDFS is a failover cluster filesystem which provides enough protection to your data over instance failures.

Mapreduce

MapReduce is a programming model and an associated implementation for processing and generating large data sets. We can specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.

The main steps hadoop takes to run a job are

  1. The client, which submits the MapReduce job.
  2. The jobtracker, which coordinates the job run. The jobtracker is a Java application whose main class is JobTracker.
  3. The tasktrackers, which run the tasks that the job has been split into. Tasktrackers are Java applications whose main class is TaskTracker.
  4. The distributed filesystem (normally HDFS), which is used for sharing job files between the other entities.

Hadoop Map/Reduce is very powerful, but

o   Requires a Java Programmer.

o   Harder to write and also time consuming.

o   Difficult to update frequently.

A solution is to Run jobs using pig(Piglatin)/hive(HiveQL).

Pig

• An engine for executing programs on top of Hadoop

• It provides a language, Pig Latin, to specify these programs

Pig has Two main parts:

– A high level language to express data analysis

– Compiler to generate mapreduce programs (which can run on top of Hadoop)

Pig Latin is the name of the language with which Pig scripts are written. Pig also provides an interactive shell for executing simple commands, called Grunt. Pig Latin is a high level language. Pig runs on top of Hadoop. It collect the data for processing from Hadoop HDFS filesystem and Submit the jobs to the Hadoop mapreduce system.

A sample mapreduce job (like a Hello World program) using pig is given below

It is assumed that you are on one of the machines which is a part of a hadoop cluster having NameNode/DataNode as well as JobTracker/TaskTracker setup.

We will be executing piglatin commands using grunt shell. Switch to hadoop user first .

Consider we have a file ‘users’ on our local filesystem which contain data to be processed.First we have to upload it to hdfs. Then

# pig -x mapreduce

this command will take you to grunt shell. Pig Latin statements are generally

organized in the following manner:

A LOAD statement reads data from the file system.Then we process the data.And writes output to the file system using STORE statement. A DUMP statement displays output to the screen.

grunt> Users = load ‘users’ as (name, age);

grunt> Fltrd = filter Users by age >= 18 and age <= 25;

grunt> Pages = load ‘pages’ as (user, url);

grunt> Jnd = join Fltrd by name, Pages by user;

grunt> Grpd = group Jnd by url;

grunt> Smmd = foreach Grpd generate group, COUNT(Jnd) as clicks;

grunt> Srtd = order Smmd by clicks desc;

grunt> Top5 = limit Srtd 5;

grunt> store Top5 into ‘top5sites’;

We can also view the progress of the job through the web interface http://<ipaddress of jobtracker machine>:50030.

Tools like PigPen (an eclipse plugin) are available  that helps us create pig-scripts, test them using the example generator and then submit them to a hadoop cluster.

There is another tool called oozie – Oozie is a server based Workflow Engine specialized in running workflow jobs with actions that run Hadoop Map/Reduce and Pig jobs.

Pig tasks can be modeled as a workflow in oozie. These are deployed to the Oozie server using a command line utility. Once deployed, the workflows can be started and manipulated as necessary using the same utility. Once the workflow is started Oozie will run through each flow.. The web console for Oozie server can be used to monitor the progress of various workflow jobs being managed by the server.

Hive

 

Pig, was causing some slowdowns at Facebook company as it needed training to bring business intelligence users up to speed. So the development team decided to write Hive which has an SQL like syntax.

Apache Hive is a data warehouse infrastructure built on top of Apache Hadoop. It provides tools for querying and analysis of large data sets stored in Hadoop files. Hive defines a simple SQL-like query language, called HiveQL, that enables users familiar with SQL to query the data. Also it allows custom mappers and reducers to perform more sophisticated analysis that may not be supported by the built-in capabilities of the language.

Some of the queries in HiveQL are given below, which is very similar to the SQL.

# show tables;

# describe <tablename>;

# SELECT * FROM <tablename> LIMIT 10;

#  CREATE TABLE table_name

#  ALTER TABLE table_name RENAME TO new_table_name

#  DROP TABLE table_name

NoSQL databases like Cassandra provide support for hadoop. Cassandra supports running Hadoop MapReduce jobs against the Cassandra cluster. With proper cluster configuration, MapReduce jobs can retrieve data from Cassandra and then output results either back into Cassandra, or into a file system.

HADOOP Cluster on AWS EC2 with hadoop-0.20 and ubuntu-10.04

Let’s start with a small introduction- what is hadoop ?. Hadoop is an open-source project administered by the Apache Software Foundation. Apache Hadoop is a Java software framework that supports data-intensive distributed applications under a free license. It enables applications to work with thousands of nodes and petabytes of data. Hadoop was inspired by Google’s MapReduce and Google File System (GFS) papers.

Technically, Hadoop consists of two key services: reliable data storage using the Hadoop Distributed File System (HDFS) and high-performance parallel data processing using a technique called MapReduce.

Dealing with big data requires two things:

  • Inexpensive, reliable storage; and
  • New tools for analyzing unstructured and structured data.

Hadoop creates clusters of machines and coordinates work among them. Clusters can be built with inexpensive computers.If one fails, Hadoop continues to operate the cluster without losing data or interrupting work, by shifting work to the remaining machines in the cluster.

HDFS manages storage on the cluster by breaking incoming files into pieces, called “blocks,” and storing each of the blocks redundantly across the pool of servers.

The main services running in a hadoop cluster will be

1)namenode

2)jobtracker

3)secondarynamenode

These three will be running only on a single node(machine) ; that machine is the central machine which controls the cluster.

4)datanode

5)tasktracker

These two services will be running on all other nodes in the cluster.

HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on.

Above the file systems comes the MapReduce  engine, which consists of one Job Tracker, to which client applications submit MapReduce jobs. The Job Tracker pushes work out to available Task Tracker nodes in the cluster, striving to keep the work as close to the data as possible.

The only purpose of the secondary name-node is to perform periodic checkpoints. The secondary name-node periodically downloads current name-node image and edits log files, joins them into new image and uploads the new image back to the (primary and the only) name-node.

Now Let us have a look at how to build a hadoop cluster using Cloudera hadoop-0.20 on ubuntu-10.04

You should install sun –jdk  first. Then add the following repositories to the apt sources list.

vim /etc/apt/sources.list.d/cloudera.list

[bash]

deb http://archive.cloudera.com/debian lucid-cdh3u0 contrib

deb-src http://archive.cloudera.com/debian lucid-cdh3u0 contrib

[/bash]

Import key

[bash]curl -s http://archive.cloudera.com/debian/archive.key | apt-key add -[/bash]

Then run

[bash]apt-get update[/bash]

For Namenode/Jobtracker ( These two services should run only on a single central machine in the cluster)

[bash]

apt-get install hadoop –yes

apt-get install hadoop-0.20-namenode

apt-get install hadoop-0.20-jobtracker

apt-get install hadoop-0.20-secondarynamenode

[/bash]

Configuration

vim /etc/hadoop/conf/hadoop-env.sh

Append these

[bash]

export JAVA_HOME=/usr/lib/jvm/java-6-sun-1.6.0.24/   ( your java home comes here )

export HADOOP_CONF_DIR=/etc/hadoop/conf

export HADOOP_HOME=/usr/lib/hadoop-0.20

export HADOOP_NAMENODE_USER=hdfs

export HADOOP_SECONDARYNAMENODE_USER=hdfs

export HADOOP_DATANODE_USER=hdfs

export HADOOP_JOBTRACKER_USER=mapred

export HADOOP_TASKTRACKER_USER=mapred

export HADOOP_IDENT_STRING=hadoop

[/bash]

vim /etc/hadoop/conf/core-site.xml

[bash]

<?xml version=”1.0″?>

<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

<!– Put site-specific property overrides in this file. –>

<configuration>

<property>

<name>fs.default.name</name>

<value>hdfs://< ip address of this machine >:8020</value>

</property>

</configuration>

[/bash]

vim /etc/hadoop/conf/hdfs-site.xml

 

[bash]

<?xml version=”1.0″?>

<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

<!– Put site-specific property overrides in this file. –>

<configuration>

<property>

<name>dfs.name.dir</name>

<value>/var/lib/hadoop-0.20/name</value>

</property>

<property>

<name>dfs.data.dir</name>

<value>/var/lib/hadoop-0.20/data</value>

</property>

<property>

<name>dfs.replication</name>

<value>2</value>

</property>

</configuration>

[/bash]

vim /etc/hadoop/conf/mapred-site.xml

[bash]

<?xml version=”1.0″?>

<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

<!– Put site-specific property overrides in this file. –>

<configuration>

<property>

<name>mapred.job.tracker</name>

<value>< ip address of this machine >:8021</value>

</property>

<property>

<name>mapred.system.dir</name>

<value>/var/lib/hadoop-0.20/system</value>

</property>

<property>

<name>mapred.local.dir</name>

<value>/var/lib/hadoop-0.20/mapred</value>

</property>

</configuration>

[/bash]

——————————————————————————————————————————————

[bash]

mkdir  / var/lib/hadoop-0.20/name

mkdir  / var/lib/hadoop-0.20/data

mkdir  / var/lib/hadoop-0.20/system

mkdir  / var/lib/hadoop-0.20/mapred

chown -R hdfs /var/lib/hadoop-0.20/name

chown -R hdfs /var/lib/hadoop-0.20/data

chown -R mapred /var/lib/hadoop-0.20/mapred

[/bash]

Now format NameNode

[bash]yes Y | /usr/bin/hadoop namenode –format[/bash]

Start namenode

[bash]/etc/init.d/hadoop-0.20-namenode start[/bash]

Check the log Files for error:

less /usr/lib/hadoop-0.20/logs/hadoop-hadoop-namenode-<ip>.log

Also you can check whether the Namenode process is up or not using the command

[bash]# jps[/bash]

Start the SecondaryNamenode

[bash]/etc/init.d/hadoop-0.20-secondarynamenode start[/bash]

Log: less /usr/lib/hadoop-0.20/logs/hadoop-hadoop-secondarynamenode-<ip>.log

[bash]

sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-0.20/system

sudo -u hdfs hadoop fs -chown mapred /var/lib/hadoop-0.20/system

[/bash]

Now Start the JobTracker

[bash]/etc/init.d/hadoop-0.20-jobtracker start[/bash]

Log : less /usr/lib/hadoop-0.20/logs/hadoop-hadoop-jobtracker-ip-10-108-39-34.log

Now  jps  command will show the three processes up

# jps

19233 JobTracker

18994 SecondaryNameNode

18871 NameNode

For Datanode/Tasktracker ( These two services should be running on all the other machines in the cluster )

[bash]

apt-get install hadoop-0.20-datanode

apt-get install hadoop-0.20-tasktracker

[/bash]

Configuration

vim /etc/hadoop/conf/core-site.xml

 

[bash]

<?xml version=”1.0″?>

<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

&nbsp;

<!– Put site-specific property overrides in this file. –>

&nbsp;

<configuration>

<property>

<name>fs.default.name</name>

<value>hdfs://< ip address of the namenode >:8020</value>

</property>

</configuration>

[/bash]

vim /etc/hadoop/conf/hdfs-site.xml

[bash]

<?xml version=”1.0″?>

<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

&nbsp;

<!– Put site-specific property overrides in this file. –>

&nbsp;

<configuration>

<property>

<name>dfs.name.dir</name>

<value>/var/lib/hadoop-0.20/name</value>

</property>

<property>

<name>dfs.data.dir</name>

<value>/var/lib/hadoop-0.20/data</value>

</property>

<property>

<name>dfs.replication</name>

<value>2</value>

</property>

</configuration>

[/bash]

vim /etc/hadoop/conf/mapred-site.xml

[bash]

<?xml version=”1.0″?>

<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

&nbsp;

<!– Put site-specific property overrides in this file. –>

&nbsp;

<configuration>

<property>

<name>mapred.job.tracker</name>

<value>< ip address of jobtracker  >:8021</value>

</property>

<property>

<name>mapred.system.dir</name>

<value>/var/lib/hadoop-0.20/system</value>

</property>

<property>

<name>mapred.local.dir</name>

<value>/var/lib/hadoop-0.20/mapred</value>

</property>

</configuration>

[/bash]

———————————————————————————————————————————————

[bash]

mkdir  /var/lib/hadoop-0.20/data/

chown -R hdfs /var/lib/hadoop-0.20/data

mkdir /var/lib/hadoop-0.20/mapred

chown -R mapred /var/lib/hadoop-0.20/mapred

[/bash]

Start the DataNode

[bash]/etc/init.d/hadoop-0.20-datanode start[/bash]

Log : less /usr/lib/hadoop-0.20/logs/hadoop-hadoop-datanode-<ip>.log

Start the TaskTracker

[bash]/etc/init.d/hadoop-0.20-tasktracker start[/bash]

Log: less /usr/lib/hadoop-0.20/logs/hadoop-hadoop-tasktracker-<ip>.log

You can now check the interface

http://< namenode-ip >:50070   – for HDFS overview

and

http://< jobtracker –ip>:50030  – for Mapreduce overview

Cassandra Cluster on AWS EC2 with Cassandra 7.x and ubuntu 10.04

Cassandra is a highly scalable, eventually consistent, distributed, structured key-value store. Cassandra brings together  Dynamo’s fully distributed design  and Bigtable’s ColumnFamily-based data model.

In a cluster, Cassandra nodes exchange information about one another using a mechanism called Gossip. The nodes in a cluster needs to know one another.  Nodes named “seed”s are the centre of this communication mechanism. It’s customary to pick a small number of relatively stable nodes to serve as your seeds. Do make sure that each seed also knows of at least one other. Having two nodes is what is preferred.

Lets have a look at how we can bring a Cassandra cluster up with Cassandra 7.x on ubuntu 10.04

First of all you have to install the java/jdk .  As that is out of scope for our discussion please do it on your own and let’s start with cassandra.

Add the following repositories to your apt sources list

vim /etc/apt/sources.list.d/cassandra.list

[bash]deb http://www.apache.org/dist/cassandra/debian 07x main
deb-src http://www.apache.org/dist/cassandra/debian 07x main[/bash]

Import the following keys and add it to apt-key

[bash]

gpg –keyserver keyserver.ubuntu.com –recv-keys 4BD736A82B5C1B00

gpg –export –armor 4BD736A82B5C1B00 | sudo apt-key add –

gpg –keyserver keyserver.ubuntu.com –recv-keys F758CE318D77295D

gpg –export –armor F758CE318D77295D | sudo apt-key add –

[/bash]

Execute

[bash]apt-get update[/bash]

and make sure that no error is there with accessing the packages.

Installing cassandra on all nodes(machines) with  which we intend to build the cluster.

[bash]apt-get install cassandra  –yes[/bash]

Now edit the configuration file for Cassandra

vim /etc/cassandra/cassandra.yaml

Here  I will discuss the important directives that has to be edited for the cluster to take effect

initial_token:

eg:  initial_token:  136112946768375385385349842972707284582

This parameter determines the position of each node in the Cassandra ring. Initial token for the first seed node should be ‘0’.Here is a simple Python script that helps to calculate the token values.

[bash]

#! /usr/bin/python

import sys

if (len(sys.argv) > 1):

num=int(sys.argv[1])

else:

num=int(raw_input(“How many nodes are in your cluster? “))

for i in range(0, num):

print ‘node %d: %d’ % (i, (i*(2**127)/num))

[/bash]

executing this script will prompt you for the no. of nodes in your cluster. Then it will output the initial tokens for each node.

For eg: Consider a 2 node cluster, the tokens will be

node 0: 0

node 1: 85070591730234615865843651857942052864

auto_bootstrap: false

You can set this to false as we are just going to start the cluster for the first time.

seeds:

-< ip address >

As I told you earlier, the seeds mentioned here will control the communication between the nodes.

You can give the ips of the two nodes here  for which you assigned the first two initial tokens generated by the script above.

Example:

Seeds:

-192.168.1.10

-192.168.1.13

This seed entries should be the same on all nodes of the cluster.

listen_address:

&

rpc_address:

You can leave both empty.

Starting  the Cassandra

For starting Cassandra you can either use an init script/ or the command “cassandra”. Here I will use the second option.

As Cassandra service was started during the installation some values will be stored in /var/lib/cassandra/data directory. So Before starting Cassandra follow these steps.

[bash]

1)      /etc/init.d/cassandra stop

2)      rm –rf  /var/lib/cassandra/data

3)      mkdir /var/lib/cassandra/data

[/bash]

After doing these steps on all the nodes please run the following  command to start Cassandra on each node starting from the seed node 1

[bash]# cassandra &[/bash]

After starting Cassandra on all the nodes you can check the cluster status using the following command

[bash]nodetool -h <ip of the node >  -p 8080 ring[/bash]

or

[bash]nodetool -h localhost -p 8080 ring[/bash]