Loading a csv file and capturing all the bad records is a very common task in ETL projects. The bad records are analyzed to take corrective or preventive measure for loading the file. In some cases, client may ask you to send the bad record file for their knowledge or action so it becomes very important to capture the bad records in these scenarios.
Most of the relational database loaders like sql loader or nzload provides this feature but when it comes to Hadoop and Spark (2.2.0) there is no direct solution for this.
However solution to this problem Is present in spark Databricks Runtime 3.0 where you just need to provide the bad record path and bad record file will get saved there.
df = spark.read
.option("badRecordsPath", "/data/badRecPath")
.parquet("/input/parquetFile")
However, in the previous spark releases this method won’t work. We can achieve this in two ways :-
- Read file as RDD and then use the RDD transformation methods to filter the bad records
- Use spark.read.csv()
In this article we will see how we can capture bad records through spark.read.csv(). In order to load a file and capture bad records we need to perform the following steps:-
- Create schema (StructType) for the feed file to load with an extra column of string type(say bad_records) for corrupt records.
- Call method spark.read.csv() with all the required parameters. Pass the bad record column name (extra column created in step 1 as parameter columnNameOfCorruptRecord.
- Filter the records where “bad_records” is not null and save it as a temp file.
- Read the temporary file as csv (spark.read.csv) and pass the same schema as above(step 1)
- From the bad dataframe Select “bad_column”.
Step 5 will give you a dataframe having all the bad records.
Code:-
>>> >>> >>>
#####################Create Schema#####################################
>>> customSchema = StructType( [ \
StructField("order_number", IntegerType(), True), \
... ... StructField("total", StringType(), True),\
... StructField("bad_record", StringType(), True)\
... ]\
... )
“bad_record” is the bad record column.
#################Call spark.read.csv()####################
>>> orders_df = spark.read \
... .format('com.databricks.spark.csv') \
... .option("badRecordsPath", "/test/data/bad/")\
.option("mode","PERMISSIVE")\
... ... .option("columnNameOfCorruptRecord", "bad_record")\
... .options(header='false', delimiter='|',) \
.load('/test/data/test.csv',schema = customSchema)...
After calling spark.read.csv, If any record doesn’t satisfy the schema then null will be assigned to all the column and a concatenated value of all columns will be assigned to the bad record column.
>>> orders_df.show()
+-------------------+-------------------+-----------------------------+-----------------------------------------
|order_number| total | bad_record|
+-------------------+-------------------+-----------------------------+----------------------------------------
| 1| 1000| null|
| 2| 4000| null|
| null| null| A|30|3000|
Here all the records were bad_record is not null shows that these records violated the schema.
NOTE:-
Corrupt record columns are generated in run time when DataFrames instantiated and data is actually fetched (by calling any action).
Output of corrupt column depends on other columns which are a part of RDD in that particular ACTION call.
If error causing column is not a part of the ACTION call then bad_column wont show any bad record.
If you want to overcome this issue and want the bad_record to persist then follow step 3,4 and 5 or use caching.
Corrupt record columns are generated in run time when DataFrames instantiated and data is actually fetched (by calling any action).
Output of corrupt column depends on other columns which are a part of RDD in that particular ACTION call.
If error causing column is not a part of the ACTION call then bad_column wont show any bad record.
If you want to overcome this issue and want the bad_record to persist then follow step 3,4 and 5 or use caching.
To learn
more on Spark click here. Let
us know your views or feedback on Facebook or
Twitter @BigDataDiscuss.
Best ladies Watch under Rs 10000
Top 10 Best ladies Watch under Rs 10000
Best Induction stove / cook-tops in India under 5000
https://www.smartdealforyou.com/electronics/top-10-best-headphones-under-rs-1000
10 Best Water Purifiers in India – A Buying Guide
How about when the csv file has 100+ columns?
ReplyDeleteyes it will work there as well, for ease in that case write a python function to generate your custom schema
DeleteI also want to capture error reason for each bad row , how can I achieve that?
ReplyDeleteIn my case I want to identify bad .gz files. How can I achieve that?
ReplyDelete