Friday, 7 January 2022

Dataframe and Dataset

Dataframe: we can consider dataframe as a table from a relational database. In the dataframe, we store data into named columns. Dataframe can be created by reading data from various sources like CSV, JSON, HDFS, etc.

In spark, dataframe is defined as: 

type DataFrame = Dataset[Row]

Dataset: Dataset is an extension of the dataframe. Dataset is a type-safe so it validates the type of column value at compile-time and throws an error for any datatype mismatch. 


How to create a dataframe from a CSV file ( with no header)?

Sample CSV file

11,1234,37
22,8675,65
33,7465,40

Solution 1: Create dataframe by providing schema.

//create spark session
  val spark = SparkSession
    .builder()
    .appName("dataframeexample")
    .master("local[*]")
    .getOrCreate()
  
  //create schema
  val schema= new StructType()
    .add("id", IntegerType, nullable = true)
    .add("orderId", IntegerType, nullable = true)
    .add("amount", FloatType, nullable = true)

  //create dataframe
  val df: DataFrame = spark.read
    .schema(schema)
    .csv("csv file path")


Solution 2: 

  //create spark session
  val spark = SparkSession
    .builder
    .appName("dfwithoutschema")
    .master("local[*]")
    .getOrCreate()

 //create dataframe
  val df: DataFrame = spark.read.format("csv")
    .option("header","false")
    .load("path/tocsvfile")
    .toDF("id", "orderId", "amount")


How to create dataset?

Dataset can be created by providing the case class to dataframe using as. Dataset can be created from RDD using toDS() method.

Ex: Dataset created by providing case class

 case class Customer(id:Int, orderId:Int, amount: Float) 
 val ds: Dataset[Customer] = spark.read
    .schema(PreviouslyCreatedSchema)
    .csv("path/to-csv-file")
    .as[Customer]

For a file with a header, we don't have to provide the schema.

Ex:

  case class Customer(id:Int, orderId:Int, amount: Int)
  val spark = SparkSession
    .builder()
    .appName("datasetforcsvfilewithheader")
    .master("local[*]")
    .getOrCreate()

  import spark.implicits._

  val ds: Dataset[Customer] = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("path/to-csvfile.csv")
    .as[Customer]

How to select particular column from dataframe or dataset?

val df = ds.select("id","amount")

dataset support various operations like groupBy, sum, aggreegate, round, etc.

We can also write sql queries on dataframe and dataset.


val ds: Dataset[Customer] = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("path/to-csvfile.csv")
    .as[Customer]

    // Write sql query on dataset.
    ds.createGlobalTempView("people")
    spark.sql("select id, sum(amount) from  global_temp.people group by id").show() 

Why Dataset?

Dataset use catalyst query and tungsten optimizer, dataset is more memory efficient than RDD.

No comments:

Post a Comment