Capture bad records while loading csv in data frame

Loading a csv file and capturing all the bad records is a very common task in ETL projects. The bad records are analyzed to take corrective or preventive measure for loading the file. In some cases, client may ask you to send the bad record file for their knowledge or action so it becomes very important to capture the bad records in these scenarios.
Most of the relational database loaders like sql loader or nzload provides this feature but when it comes to Hadoop and Spark (2.2.0) there is no direct solution for this.
pySpark - Capture bad records while loading csv in Spark Data Frame
However solution to this problem Is present in spark  Databricks Runtime 3.0 where you just need to provide the bad record path and bad record file will get saved there.

df = spark.read
  .option("badRecordsPath""/data/badRecPath")
  .parquet("/input/parquetFile")

However, in the previous spark releases this method won’t work. We can achieve this in two ways :-
  1. Read file as RDD and then use the RDD transformation methods to filter the bad records
  2. Use spark.read.csv()


In this article we will see how we can capture bad records through spark.read.csv(). In order to load a file and capture bad records we need to perform the following steps:-

  1. Create schema (StructType) for the feed file to load with an extra column of string type(say bad_records) for corrupt records.
  2. Call method spark.read.csv() with all the required parameters. Pass the bad record column name (extra column created in step 1 as parameter columnNameOfCorruptRecord.
  3. Filter the records where “bad_records” is not null and save it as a temp file.
  4. Read the temporary file as csv (spark.read.csv) and pass the  same schema as above(step 1)
  5. From the bad dataframe Select “bad_column”.


Step 5 will give you a dataframe having all the bad records.

Code:-

>>> >>> >>>
#####################Create Schema#####################################
>>> customSchema = StructType(      [ \
                                StructField("order_number", IntegerType(), True), \
... ...                                 StructField("total", StringType(), True),\
...                                 StructField("bad_record", StringType(), True)\
...                             ]\
...                     )
“bad_record” is the bad record column.

#################Call spark.read.csv()####################
>>> orders_df = spark.read \
...                 .format('com.databricks.spark.csv') \
...                 .option("badRecordsPath", "/test/data/bad/")\
                .option("mode","PERMISSIVE")\
... ...                 .option("columnNameOfCorruptRecord", "bad_record")\
...                 .options(header='false', delimiter='|',) \
                .load('/test/data/test.csv',schema = customSchema)...

After calling spark.read.csv, If any record doesn’t satisfy the schema then null will be assigned to all the column and a concatenated value of all columns will be assigned to the bad record column.
>>> orders_df.show()
+-------------------+-------------------+-----------------------------+-----------------------------------------
|order_number|        total         |                  bad_record|
+-------------------+-------------------+-----------------------------+----------------------------------------
|                       1|                 1000|                                  null|
|                       2|                 4000|                                  null|
|                  null|                    null|                     A|30|3000|

Here all the records were bad_record is not null shows that these records violated the schema.

NOTE:-
Corrupt record columns are generated in run time when DataFrames instantiated and data is actually fetched (by calling any action).
Output of corrupt column depends on other columns which are a part of RDD in that particular ACTION call.
If error causing column is not a part of the ACTION call then bad_column wont show any bad record.
If you want to overcome this issue and want the bad_record to persist then follow step 3,4 and 5 or use caching.




4 comments:

  1. How about when the csv file has 100+ columns?

    ReplyDelete
    Replies
    1. yes it will work there as well, for ease in that case write a python function to generate your custom schema

      Delete
  2. I also want to capture error reason for each bad row , how can I achieve that?

    ReplyDelete
  3. In my case I want to identify bad .gz files. How can I achieve that?

    ReplyDelete