Apache Oozie Overview
Oozie is a workflow scheduler system to manage primarily Apache Hadoop jobs. Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions. Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability. Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Spark, Sqoop and Distcp) as well as system specific jobs (such as Java programs and shell scripts). Oozie is a scalable, reliable and extensible system.
In this blog, we are going to learn about the data transfer between your On Prem Hadoop Cluster and Amazon S3 and further learn how to do advanced data analytics in Databricks Cloud and integrate the data pipelines using Apache Oozie with an End to End Integration.
Config Changes Required for the data transfer from HDFS to S3 and vice versa:
In order to protect your AWS Access and Secret Keys, you can add them first in the site files of your hadoop cluster as below. Make the below changes in the configuration files. Mostly these files would be present in /etc/hadoop/conf and /etc/hive/conf directories.
- Hdfs-site.xml and Hive-site.xml files
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
Restart all affected Services (NameNode, YARN, MapReduce, Hive, Oozie, etc)
Assign the Administration Access Policy to the AWS User.
After this please verify that S3 Endpoint Bucket is accessible from your On Prem Hadoop Cluster as below.
hdfs dfs -ls s3a://<bucket-name>/*
hdfs dfs -mkdir s3a://<bucket-name>/newdirectory
hdfs dfs -put /opt/mnt/<local_file.csv> s3a://<bucket-name>/newdirectory
Apache Distcp
DistCp (distributed copy) is a tool used for large inter/intra-cluster copying and can be extended to Amazon S3. It uses Map Jobs to effect its distribution, error handling and recovery, and reporting. It expands a list of files and directories into input to map tasks, each of which will copy a partition of the files specified in the source list. Using distcp, a cluster of many members can copy lots of data quickly. Distcp process is best suggested for speeds up to 2 GB/s for a transfer of 5 TB. Distcp is more secured data transfer tool which enables your On Prem security features such as Kerberos, SSL, etc.
hadoop distcp -m 20 /user/apps/databricks/<your_data_directory> s3a://<bucket-name>/distcp_files/ -mapredSslConf <ssl_conf_file>
Oozie/Databricks Data Pipeline Workflow:
TL;DR:Let’s consider your On Prem Cluster receives an audit file on a daily basis in CSV format through an internal or external process. And assume this audit file has some IT environment and resources usage details. You would like to perform an advanced analytics solution in Spark but you realize that you are actually limited to Apache Spark version (Say 1.5 or 1.6) in your On Prem Cluster. Now you decided to leverage latest Apache Spark version (2.2+) in Databricks to do your ETL and other analytics need such as find the total hours for each computer consumed the power on a daily basis with an visualization and finally want to convert that aggregated output to Parquet and store it in S3. As an optional step, you also want to transfer the Parquet output file back to your HDFS Cluster. For all success/failure actions, you would like to receive Email Notification(s). This is the use case.
In Short, we will be triggering a Oozie Workflow based on a file arrival in HDFS, transfer that to Amazon S3 using Distcp process, perform ETL in Databricks through a REST API and Store the output in S3, transfer the aggregated data back to HDFS using Distcp process (if required) and trigger Success/Failure Email Event Notifications.
Please see below the end-to-end approach to solve this problem using Apache Oozie and Databricks Job Clusters through a Rest API supported in Databricks.
How to use Apache Oozie with Databricks
We will be creating a Oozie Co-ordinator with a HDFS File dependency and trigger an actual Oozie Workflow which performs all the above actions mentioned in the diagram. These two workflows are controlled by a job.properties file which will be used to configure the Oozie Job in your on premise oozie server.
Setup Oozie WorkFlow with Email Notification
Primarily this XML based workflow is triggered based on an event which could be an arrival of your business data in HDFS. In this example, the wf is actually setup with 4 different actions.
- Action 1 (ACTION01-HDFS-to-S3-File-Transfer), the first and foremost action transfers your HDFS data to Amazon S3 file system through distcp process. The data transfer is fast as it’s distributed across your On Prem Cluster.
<action name="ACTION01-HDFS-to-S3-File-Transfer">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<arg>${nameNode}/user/apps/databricks/curl/shell/june_audit_dbc.csv</arg>
<arg>${s3bucket}/oozieInput</arg>
</distcp>
<ok to="ACTION02-Run-ETL-in-Databricks" />
<error to="sendEmailKill" />
</action>
- Action 2(ACTION02-Run-ETL-in-Databricks) is triggered upon successful completion of Action 1. This action basically executes a shell script which internally calls a Databricks Job to execute on the Cloud Platform through REST API 2.0 services powered by Databricks.
<action name="ACTION02-Run-ETL-in-Databricks">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>script.sh</exec>
<file>${exec_path}/script.sh</file>
</shell>
<ok to="ACTION03-Transfer-S3-to-OnPrem-HDFS" />
<error to="sendEmailKill" />
</action>
- Action 3 (ACTION03-Transfer-S3-to-OnPrem-HDFS) is triggered upon successful completion of Action 2. This action actually copies the output of your Databricks Job to your On Premise HDFS cluster. This step is optional as you wish to bring the data back to your in prem cluster.
<action name="ACTION03-Transfer-S3-to-OnPrem-HDFS">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<arg>${s3bucket}/oozieOutput/juneAudit</arg>
<arg>${nameNode}/user/apps/databricks/curl/shell</arg>
</distcp>
<ok to="ACTION04-Create-WF-Success-Trigger" />
<error to="sendEmailKill" />
</action>
- Action 4(ACTION04-Create-WF-Success-Trigger) is triggered upon successful completion of Action 3. This action creates an event notification such as a SUCCESS Trigger file in your S3 bucket to show that Actions 1 through 3 is successful.
<action name="ACTION04-Create-WF-Success-Trigger">
<fs>
<delete path="${s3bucket}/oozieOutput/dbc-run-api-success.txt" />
<touchz path="${s3bucket}/oozieOutput/dbc-run-api-success.txt" />
</fs>
<ok to="sendEmailSuccess" />
<error to="sendEmailKill" />
</action>
Finally upon successful execution from Actions 1 through 4, an email is triggered to the user mentioning the job is successful with a workflow id through a sendEmailSuccess action.
- If any of the action is failed, Oozie considers the entire workflow is failed and notifies to the user mentioning the job is failed for a reason with a workflow id through a sendEmailFailure action.
<action name="sendEmailSuccess">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<cc>${emailCcAddress}</cc>
<subject>Status of workflow ${wf:id()}</subject>
<body>API Job in Databricks == > The workflow ${wf:id()} completed successfully</body>
</email>
<ok to="end" />
<error to="end" />
</action>
<action name="sendEmailKill">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<cc>${emailCcAddress}</cc>
<subject>Status of workflow ${wf:id()}</subject>
<body>API Job in Databricks == > The workflow ${wf:id()} had issues and was killed == > The error message is: ${wf:errorMessage(wf:lastErrorNode())}</body>
</email>
<ok to="end" />
<error to="fail" />
</action>
Setup Oozie Co-Ordinator app
Commonly, workflow jobs are run based on regular time intervals and/or data availability. And, in some cases, they can be triggered by an external event.
- We have setup this XML based Co-Ordinator for firing up the workflow we created above to perform the above said 4 actions. We have mentioned the dependency dataset in HDFS inside the dataset node of the XML.
<coordinator-app name="co-for-databricks-job" frequency="${coord:days(1)}" start="${startTime}" end="${endTime}" timezone="GMT-0700" xmlns="uri:oozie:coordinator:0.2">
<datasets>
<dataset name="dateFormat" frequency="${coord:days(1)}" initial-instance="${startTime}" timezone="GMT-0700">
<uri-template>${YEAR}-${MONTH}-${DAY}</uri-template>
</dataset>
<dataset name="check_for_this_inside_s3" frequency="${coord:days(1)}" initial-instance="${startTime}" timezone="GMT-0700">
<uri-template>${nameNode}/user/apps/databricks/curl/shell</uri-template>
<done-flag>june_audit_dbc.csv</done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="this-job-is-dependent-on" dataset="check_for_this_inside_s3">
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<output-events>
<data-out name="currentDate" dataset="dateFormat">
<instance>${coord:current(0)}</instance>
</data-out>
<data-out name="previousDate" dataset="dateFormat">
<instance>${coord:current(-1)}</instance>
</data-out>
</output-events>
<action>
<workflow>
<app-path>${workFlowPath}</app-path>
<configuration>
<property>
<name>WaitForThisInputData</name>
<value>${coord:dataIn('this-job-is-dependent-on')}</value>
</property>
<property>
<name>rundate</name>
<value>${coord:dataOut('currentDate')}</value>
</property>
<property>
<name>yesterdayDate</name>
<value>${coord:dataOut('previousDate')}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
Setup Databricks Jobs API Rest Endpoind
- The Databricks REST API enables programmatic access to Databricks, (instead of going through the Web UI). It can automatically create and run jobs, productionalize a data flow, and much more. It will also allow us to integrate Apache Oozie with Databricks through Workflow and Co-Ordinator Apps. We will place this script in a .sh file so that Oozie Action 02 executes this file in HDFS and make necessary REST API Calls so that your job is executed on Databricks Job clusters.
curl -X POST -u userName:passWord
https://<shardName>.cloud.databricks.com/api/2.0/jobs/run-now
-d '
{
"job_id":1234,
"notebook_params":{
"inputPath":"s3a://<bucketName>/oozieInput/june_audit_dbc.csv",
"outputPath":"s3a://<bucketName>/oozieOutput/juneAudit"
}
}’
Setup Oozie Job Properties
- This properly file configuration contains all necessary On Prem and AWS S3 related information such as Namenode, Resourcemanager Endpoints, HDFS path for shell script execution, Oozie HDFS work directory, Workflow Start time, End Time in UTC, Email address for event notifications, S3 bucket details,etc.
#The Path where the workflow is placed in HDFS
jobTracker=localhost:8050
nameNode=hdfs://nn1:8020
ooziedir=dbc-oozie
user.name=root
s3bucket=s3a://<bucket-name>
exec=script.sh
exec_path=${nameNode}/user/apps/databricks/curl/shell
#Time Frequency based on UTC Time
startTime=2017-02-19T04:00Z
endTime=2017-02-19T12:00Z
# Email address can be comma (,) separated
[email protected],[email protected]
[email protected]
#The Path where the workflow is placed in HDFS
oozie.coord.application.path=${nameNode}/user/apps/databricks/curl/shell/co/co-dbc.xml
workFlowPath=${nameNode}/user/apps/databricks/curl/shell/wf/wf-dbc.xml
CSV to Parquet File Conversion Example:
Below is an example of setting up a pipeline to process CSV files (In this example, we have taken an audit file that is arriving in the On Prem Hadoop Cluster on a daily basis) and we are converting them to parquet after an ETL process using Databricks Cloud Solution. Oozie is used to orchestrate this pipeline by detecting when daily files are ready for processing and and sending a final email notification.
Once all the Proper XML files are setup, you will trigger this command in your Hadoop Cluster. This command submits a co-ordinator workflow to the Oozie server.
oozie job -oozie
http://localhost:11000/oozie
-config /opt/giri/dbc-oozie/job.properties -run
The Co-ordinator App kicks off the Oozie Workflow upon successful arrival of a file in HDFS.
hdfs://localhost:8020/user/apps/databricks/curl/shell/juneAuditDBC.csv
You will see this below stage once Oozie Triggers Action 02.
This is after Successful Completion of the Job.
The Successful executions from Oozie Action 1 through 4 is shown as below in the Oozie Server GUI.
Databricks Action involves reading an input CSV file and converting it into parquet. The values “inputPath” and “outputPath” is passed from a Oozie shell action.
Verify the Parquet file Created in Amazon S3.
Verify the Parquet file is transferred to HDFS as part of Action 04
[root@sandbox ~]# hdfs dfs -ls /user/apps/databricks/curl/shell/juneAudit
Found 2 items
-rw-r--r-- 3 root hdfs 0 2017-02-19 17:27 /user/apps/databricks/curl/shell/juneAudit/_SUCCESS
-rw-r--r-- 3 root hdfs 29786 2017-02-19 17:27 /user/apps/databricks/curl/shell/juneAudit/part-00000-5ab5e6b8-25a7-4e83-9802-85a41e1b4312-c000.gz.parquet