Deployment modes and job submission in Apache Spark

There are various ways of submitting an application in spark. In Addition to client and cluster modes of execution there is also a local mode of submitting a spark job. We must understand these modes of execution before we start running our job. Before we jump into it we need to recall few important things which learnt in the previous lesson click Introduction to Apache Spark to know more. 


Spark is a Scheduling Monitoring and Distribution engine i.e. Spark is not only a processing engine it can also acts as a resource manager for the job submitted to it. Spark can run by itself(Standalone) using its own cluster manager and can run also on top of other cluster/resource managers. 

How Spark supports different Cluster Managers and Why? 

This is made possible with the help of SparkContext object which is in the main driver program of spark. SparkContext object can connect to several types of cluster managers enabling Spark to run on top of other cluster manager frameworks like Yarn or Mesos. It is this object which coordinates between the independently executing parallel threads of the cluster. 
Spark cluster components, Spark Driver and Workers, Spark Deployment modes, Spark Tutorials
src: https://spark.apache.org/docs/latest/cluster-overview.html 
Spark Installation can be launched in three different ways: -
        1.   Local(pseudo-cluster mode)
        2.   Standalone (Cluster with Spark default Cluster manager)
        3.   On top of other Cluster Manager (Cluster with Yarn, Mesos or Kubernetes as Cluster Manager)


Local:-

Local mode is pseudo-cluster mode generally used for testing and demonstration. In local mode it runs all the execution component in a single node.


Standalone: - 

In Standalone mode the default Cluster manager provided in the official distribution of Apache spark is used for resource and cluster management of Spark Jobs. It has standalone Master for resource Management and Standalone worker for the task.

Please do not get confused here, 
Standalone mode doesn't mean a single node Spark deployment. It is also a cluster deployment of Spark, the only thing to understand here is the cluster will be managed by Spark itself in Standalone mode.


On top of other Cluster Manager: -

Apache Spark can also run on other Cluster managers like Yarn, Mesos or Kubernates. However, the most used cluster manager for Spark in Industry is Yarn because of good compatibility with HDFS and other benefits it brings like data locality.


The command used to submit a spark job in Standalone and other cluster mode is same.
Scala Spark
PySpark
spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other spark properties options
  <application-jar> \
  [application-arguments]
spark-submit \ \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other Spark properties options
  --py-files <python-modules-jars>
  my_application.py
  [application-arguments]
Table 1: Spark-submit command in Scala and Python

For Python applications, in place of a JAR we need to simply pass our .py file as <application-jar>, and add Python dependencies like modules, .zip, .egg or .py files in --py-files.

How to submit a Spark job on Standalone Cluster vs Cluster managed by other cluster managers? 

Answer to the above question is very simple. You need to use the "--master" option show in the above spark submit command and pass the master url of the cluster e.g.
Mode
Value of “--master”
For Standalone deployment mode
--master spark://HOST:PORT
For Mesos
--master mesos://HOST:PORT
For Yarn
--master yarn
Local
--master local[*] :: * = number of threads
Table 2: Spark-submit "--master" for different Spark deployment modes

When you submit a job in spark the application jars (the code which you have written for the job) is distributed to all worker nodes along with the jar files(if mentioned)

We talked enough about the Cluster deployment mode, now we need to understand the application "--deploy-mode" . The above deployment modes which we discussed so far is Cluster Deployment mode and is different from the "--deploy-mode" mentioned in spark-submit command(table 1) . --deploy-mode is the application(or driver) deploy mode which tells how to run the job in cluster(as already mentioned cluster can be a standalone, a yarn or Mesos). For an application(spark job) to run on cluster there are two --deploy-modes, one is client and other is cluster mode.

Spark Application deploy modes: -

Cluster: - When the driver runs inside the cluster then it is cluster deploy mode. In this case Resource Manager or Master decides which node the driver will run.

Client: - In Client mode the driver runs in the machine where the job is submitted.

Now the question arises -

"How to submit a job in Cluster or Client  mode and which is better?"



How to submit:-

In the above spark submit command just pass  "--deploy-mode client" for client mode and "--deploy-mode cluster" for cluster mode.

 

Which one is better, Client or Cluster mode:

Unlike Cluster mode in client mode if the client machine is disconnected then the job will fail. Client mode is good if you want to work on spark interactively, also if you don’t want to eat up any resource from your cluster for the driver daemon then you should go for client mode. When dealing with huge data set and calling action on RDDs or Dfs you need to make sure you have sufficient resources available on Client. So it’s not like the cluster or client mode is better than the other. You can choose any deploy mode for your application, it depends on what suits your requirement.

Client
Cluster
Driver runs in the machine where the job is submitted.
Driver runs inside the cluster. Resource Manager or Master decides which node the driver will run
Job fails if the driver is disconnected
After submitting the job client can disconnect.
Can be used to work with spark in an interactive manner. Performing action on RDD or DataFrame(like count) and capturing them in logs becomes easy.
Cannot be used to work with spark in an interactive manner.
Jars can be accessed from Client  machine.
Since the driver runs on a different machine than the client, so the jars present in local machine won’t work. Those jars should be made available to all nodes either by placing them on each node or mention them in --jars or as –py-files during spark-submit.
YARN:-
Spark driver does not run on the YARN cluster only executor runs inside YARN cluster.

Spark driver and executor both runs on the YARN cluster.
The local dir used by driver is spark.local.dir and for executor it is YARN config yarn.nodemanager.local-dirs.
The local directories used by the Spark executors and the Spark driver will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs)
Table 3: Spark Client Vs Cluster Mode

Here are some examples on submitting a spark job in different modes: -
Mode
Scala
PySpark
Local
./bin/spark-submit \
  --class main_class \
  --master local[8] \
  /path/to/examples.jar
./bin/spark-submit \
  --master local[8] \
  my_job.py
Spark Standalone: -

./bin/spark-submit \
  --class main_class \
 --master spark://<ip-address>:7077 \
 --deploy-mode cluster \
 --supervise \
 --executor-memory 10G \
 --total-executor-cores 100 \
  /path/to/examples.jar
./bin/spark-submit \
--master spark://<ip-add>:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 10G \
  --total-executor-cores 100 \
  --py-files
  my_job.py
Yarn Cluster mode
./bin/spark-submit \
  --class main_class \
  --master yarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 10G \
  --num-executors 50 \
  /path/to/examples.jar
./bin/spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --executor-memory 10G \
  --num-executors 50 \
    --py-files
  my_job.py
Table 4: Spark submit examples for different mode


To learn more on Spark click here. Let us know your views or feedback on Facebook or Twitter @BigDataDiscuss.

What is Spark RDD and Why Spark needs it?

RDD (Resilient Distributed Dataset) is the fundamental data structure of Apache Spark. This post will cover What is RDD, Why Spark needs it and how to create an RDD.
Before diving into RDDs, You need to understand What is Apache Spark and how it works? Click below to Understand Apache spark


·        Introduction to Apache Spark


By now we already know that Spark is a parallel processing engine which does lightning fast fault tolerant in-memory* parallel processing of data. Not just Spark, for any processing engine which does in-memory fault tolerant processing in a cluster the following the key features would be required: -

1.     Partitioned data: Distributed dataset so that computation can be done in parallel.

2.   Resilience or Fault Tolerance: -Since the data is distributed among nodes so it also needs some mechanism to re-compute the portion of job which gets affected because of node failures.


3.   Data Locality: - During Processing Distributed data may cause lots of data movement through the network, so there should be minimum data movement rather the computation itself should be moved closure to where the data is present.

4.   Data sharing in memory is 10 to 100 times faster than Disk or network. Writing and reading from disc is very costly operation so RAM or SSDs should be used instead of disks.

All the above mentioned features like Partitioned, Fault Tolerance, Data Locality are Combined and put together into a single class called RDD. The object instantiated from the RDD class has all the above features.

Now question arises How we can create RDD?



There are two ways to create an RDD:-
·        Read the file you need through the spark provided APIs. Its very simple, you just need to call a method and pass your file as parameter and you will get an RDD in return.

Scala
  val myRdd1=sc.textFile(“file_path”)
  val myRdd2=spark.read.csv(“file_path”).rdd

Python
  myRdd1=sc.textFile(“File_path”)
  myRdd2=spark.read.csv(“file_path”).rdd


·       Create a collection (e.g. a list in python) and apply the parallelize method on it.
Scala
  val myRdd1=sc.parallelize(Array(“Bangalore”, “New York”, “London”))

Python
  myRdd1=sc.parallelize([“Bangalore”, “New York”, “London”])


Once RDD is created it can be operated in parallel. You can test your RDD by calling rdd.collect() method in pyspark(Python) shell or spark-shell(Scala).

In addition to the above mentioned features of RDD there is one more critical feature which is called Lazy Evaluation.

5.    Lazy Evaluation: The moment RDD is created it won’t be materialized with data instantly. Just by creation an RDD data won’t move from file to RAM. Even if you create multiple RDDs from the base RDD but there will be no computation. Spark will only create a DAG(Directed Acyclic Graph) for all the transformations applied on RDD.

The actual computation will happen when Action is called on RDD, at that time only RDD is materialized with actual data. This happens just for fraction of second till the computation is fully done and after that memory id again freed. This is called lazy evaluation.

Lazy Evaluation will be discussed more in detail when we will talk about RDD Transformation and Action.

Now we will try to understand RDDs with a simple example. Suppose there are 10 nodes in your cluster and you have a file of 100 GB to process. RDD can be created by simply calling the read method on the file(as mentioned in table 1). Spark will create logical partitions of the file and map it to different partitions of the RDD. 

So different nodes of the cluster will have a different set of partition to work on. In above scenario a single file of 100 GB may get distributed in 400+ partitions with each node working on 40+ partitions i.e. each node may work on just 10 gb of data.

On what basis Spark partition the file and how different node gets a different partitioned dataset is a separate topic which will be covered in next post.



Since the data in RDD is processed in a distributed fashion so there is a need to do coordination among nodes. In Spark standalone this distribution, coordination among nodes and monitoring is taken care by Spark itself, however in other modes this is taken care by the scheduling/monitoring tool used e.g. YARN, Mesos.

To know more about Spark deployment modes and job submission please see the below post: -




There are two operations which we can be performed on RDDs. They are Transformation and Action. We will discuss about RDD operations in next post.




To learn more on Spark click here. Let us know your views or feedback on Facebook or Twitter @BigDataDiscuss.

Files with special character or Encoding in Spark

Encoding is used to translate the numeric values into a readable character it provides the information that your computer needs to display the text on the screen.

Spark read.csv() method accepts a parameter for encoding which decodes the file while reading, if the parameter is not given it takes the default utf-8.

csv(pathschema=Nonesep=Noneencoding=Nonequote=Noneescape=Nonecomment=Noneheader=NoneinferSchema=NoneignoreLeadingWhiteSpace=NoneignoreTrailingWhiteSpace=NonenullValue=NonenanValue=NonepositiveInf=NonenegativeInf=NonedateFormat=NonetimestampFormat=NonemaxColumns=NonemaxCharsPerColumn=NonemaxMalformedLogPerPartition=Nonemode=None)
So what basically happens is while reading the file Spark decodes the file as per the encoding given, then it applies the schema on top of it and loads the data in data frame

If data contained in the files complies with the schema you will get all the records loaded as data frame but that can only happen if spark is able to read the file based on the encoding you provided, what if Spark could not translate the value to a proper readable form?? How Spark will apply Schema on top of the records which are not at all readable for spark?? In that case the value wont comform with  the schema and get moved o the bad record column. 

Every now and then we receive feed which has special character, we also encounter scenarios where we are not aware of the actual encoding of the file. We spend time in trying out different encoding formats like utf-8, utf-16, iso-8859-1, or latin-1 but none of the encoding works properly in reading the file through spark. Many database applications like Netezza, Teradata can deal with file having different kinds of encoding but when these files are loaded through spark you will get bad records (Click here to know how to Capture bad Records while loading file in hive through Spark


However I need to do more research on Spark encoding issues, I am writing some of the issues which I have encountered. In this post I will share the encoding related issue which I faced sometimes back while loading the files though spark.

Source system for the feed file which I had was Netezza, file was loaded successfully in Netezza with zero bad records when I tried loading the file through spark.read.csv there were some bad records. I tried different kinds of encoding but every time I got bad records.


finally I decided to read the files as bytes and then try to apply encoding on it 

This fixed my problem and all records got loaded.

Note:- You may see some non readable character as output if there are special characters in file.


To learn more on Spark click hereLet us know your views or feedback on Facebook or Twitter @BigdataDiscuss.

Capture bad records while loading csv in data frame

Loading a csv file and capturing all the bad records is a very common task in ETL projects. The bad records are analyzed to take corrective or preventive measure for loading the file. In some cases, client may ask you to send the bad record file for their knowledge or action so it becomes very important to capture the bad records in these scenarios.
Most of the relational database loaders like sql loader or nzload provides this feature but when it comes to Hadoop and Spark (2.2.0) there is no direct solution for this.
pySpark - Capture bad records while loading csv in Spark Data Frame
However solution to this problem Is present in spark  Databricks Runtime 3.0 where you just need to provide the bad record path and bad record file will get saved there.

df = spark.read
  .option("badRecordsPath""/data/badRecPath")
  .parquet("/input/parquetFile")

However, in the previous spark releases this method won’t work. We can achieve this in two ways :-
  1. Read file as RDD and then use the RDD transformation methods to filter the bad records
  2. Use spark.read.csv()


In this article we will see how we can capture bad records through spark.read.csv(). In order to load a file and capture bad records we need to perform the following steps:-

  1. Create schema (StructType) for the feed file to load with an extra column of string type(say bad_records) for corrupt records.
  2. Call method spark.read.csv() with all the required parameters. Pass the bad record column name (extra column created in step 1 as parameter columnNameOfCorruptRecord.
  3. Filter the records where “bad_records” is not null and save it as a temp file.
  4. Read the temporary file as csv (spark.read.csv) and pass the  same schema as above(step 1)
  5. From the bad dataframe Select “bad_column”.


Step 5 will give you a dataframe having all the bad records.

Code:-

>>> >>> >>>
#####################Create Schema#####################################
>>> customSchema = StructType(      [ \
                                StructField("order_number", IntegerType(), True), \
... ...                                 StructField("total", StringType(), True),\
...                                 StructField("bad_record", StringType(), True)\
...                             ]\
...                     )
“bad_record” is the bad record column.

#################Call spark.read.csv()####################
>>> orders_df = spark.read \
...                 .format('com.databricks.spark.csv') \
...                 .option("badRecordsPath", "/test/data/bad/")\
                .option("mode","PERMISSIVE")\
... ...                 .option("columnNameOfCorruptRecord", "bad_record")\
...                 .options(header='false', delimiter='|',) \
                .load('/test/data/test.csv',schema = customSchema)...

After calling spark.read.csv, If any record doesn’t satisfy the schema then null will be assigned to all the column and a concatenated value of all columns will be assigned to the bad record column.
>>> orders_df.show()
+-------------------+-------------------+-----------------------------+-----------------------------------------
|order_number|        total         |                  bad_record|
+-------------------+-------------------+-----------------------------+----------------------------------------
|                       1|                 1000|                                  null|
|                       2|                 4000|                                  null|
|                  null|                    null|                     A|30|3000|

Here all the records were bad_record is not null shows that these records violated the schema.

NOTE:-
Corrupt record columns are generated in run time when DataFrames instantiated and data is actually fetched (by calling any action).
Output of corrupt column depends on other columns which are a part of RDD in that particular ACTION call.
If error causing column is not a part of the ACTION call then bad_column wont show any bad record.
If you want to overcome this issue and want the bad_record to persist then follow step 3,4 and 5 or use caching.