Belajar PySpark - Definisi Skema pada DataFrame

belajar-pyspark-skema-dataframe

Artikel ini merupakan seri Belajar PySpark yang merupakan kelanjutan dari artikel-artikel sebelumnya. Tutorial PySpark kali ini akan membahas mengenai Skema pada Spark DataFrame. Skema merupakan definisi dari struktur dari DataFrame, yang memastikan konsistensi dan kejelasan dalam interpretasi data. 

Yuk langsung saja kita belajar PySpark untuk mendefinisikan skema DataFrame.

Dalam Apache Spark, skema DataFrame merupakan hal yang penting, karena merupakan acuan struktur, nama, dan tipe data dari kolom-kolom dalam dataframe. Beberapa manfaat penggunaan skema:

  • Integritas Data dan type-safety : dengan skema, kita memastikan bahwa data dalam DataFrame sesuai dengan struktur dan tipe data tertentu. Hal ini membantu mencegah ketidakcocokan tipe data dan meningkatkan integritas data.
  • Optimasi kinerja: dengan skema, Spark dapat menyimpan data secara lebih efisien dalam memori karena mengetahui tipe dan struktur data yang tepat. Hal ini dapat menghasilkan peningkatan kinerja yang signifikan. Penggunaan skema pre-defined juga mengurangi kerja Spark pada saat melakukan loading data.
  • Optimasi kueri: Catalyst Optimizer, komponen Spark yang melakukan optimasi proses, dapat menggunakan informasi skema untuk melakukan predikat pushdown, me-resolve referensi kolom pada waktu kompilasi, dan menerapkan berbagai optimasi kueri.

Ada beberapa cara untuk mendefinisikan skema dalam Spark DataFrame, diantaranya yaitu:

  • Inferensi skema dari data. Spark dapat menyimpulkan skema DataFrame secara otomatis ketika membaca data dari sumber eksternal (misalnya file CSV, JSON, Parquet).
  • Mendefinisikan skema secara eksplisit menggunakan StructType dan StructField dari modul pyspark.sql.types
  • Menggunakan DDL (Data Definition Language) seperti CREATE TABLE 
  • Mengubah struktur kolom DataFrame yang sudah ada dengan menggunakan type casting dan withColumn

Sebelumnya kita import dulu package yang diperlukan, dan kita inisialisasi spark session.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Belajar PySpark - Skema DataFrame").getOrCreate()

 

Inferensi Skema Dari Data

Spark dapat membaca skema dari sumber data yang dimuat ke dataframe, misalnya file CSV, JSON, Parquet, dll. Cara ini biasanya digunakan pada tahap eksplorasi data, dan jika data yang dibaca ukurannya tidak terlalu besar. 

Untuk melakukan inferensi skema dari eksternal data, set parameter inferSchema = True pada saat loading

df_infer = spark.read.csv("mhs_header.csv", header=True, inferSchema=True)
df_infer.printSchema()
df_infer.show()
root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: integer (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)

+-----+------------+------+------+------+
| nama|kode_jurusan|nilai1|nilai2|nilai3|
+-----+------------+------+------+------+
| Agus|           F|   100|   150|   150|
|Windy|           F|   200|   150|   180|
| Budi|           B|   200|   100|   150|
| Dina|           F|   150|   150|   130|
| Bayu|           F|    50|   150|   100|
| Dedi|           B|    50|   100|   100|
+-----+------------+------+------+------+

 

Ketika kita membentuk DataFrame dari object python, misalnya list, dan tidak men-supply skema, maka Spark akan secara otomatis menyimpulkan (melakukan inferensi) skema sesuai dengan data

data = [['Agus','F',100,150,150],
        ['Budi','B',200,100,150],
        ['Dina','F',150,150,130],
        ['Dedi','B', 50,100,100]]


df = spark.createDataFrame(data, kolom)
df.show()
df.printSchema()
+----+---+---+---+---+
|  _1| _2| _3| _4| _5|
+----+---+---+---+---+
|Agus|  F|100|150|150|
|Budi|  B|200|100|150|
|Dina|  F|150|150|130|
|Dedi|  B| 50|100|100|
+----+---+---+---+---+

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: long (nullable = true)
 |-- _5: long (nullable = true)

 

Karena kita tidak mensupply informasi nama kolom, maka Spark men-generate nama kolom dengan format _1, _2, …, _N

 

Definisi Skema Dengan Structtype Dan Structfield

Berbeda dengan tahap eksplorasi data, lingkungan produksi biasanya memerlukan definisi skema yang lebih strict. Definisi skema juga bisa digunakan untuk membaca file csv yang tidak mengandung header.

Untuk mendefinisikan skema, kita perlu melakukan import kelas-kelas yang diperlukan, seperti StructType, StructField, StringType, dll. dari package pyspark.sql.types

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

 

Untuk mendefinisikan skema, kita membuat object bertipe StructType, yang merupakan kumpulan dari object bertipe StructField. 

StructField merupakan object yang mendefinisikan nama kolom, tipe data, dan nullability (apakah kolom boleh bernilai NULL). 

Untuk mendefinisikan skema, sintaks yang digunakan adalah

StructType([ StructField(namafield_1, Type, nullability), 
             StructField(namafield_2, Type, nullability), 
             … 
             StructField(namafield_N, Type, nullability)
           ])

 

Contoh penggunaannya seperti berikut

data = [['Agus','F',100,150,150],
        ['Budi','B',200,100,150],
        ['Dina','F',150,150,130],
        ['Dedi','B', 50,100,100]]


mySchema = StructType([ \
                       StructField('nama', StringType(), True), \
                        StructField('kode_jurusan', StringType(), True), \
                        StructField('nilai1', IntegerType(), True), \
                        StructField('nilai2', IntegerType(), True), \
                        StructField('nilai3', IntegerType(), True) \
                        ])


df = spark.createDataFrame(data,mySchema)
df.show()
df.printSchema()
+----+------------+------+------+------+
|nama|kode_jurusan|nilai1|nilai2|nilai3|
+----+------------+------+------+------+
|Agus|           F|    10|    15|   150|
|Budi|           B|    20|    10|   150|
|Dina|           F|    15|    15|   130|
|Dedi|           B|     5|    10|   100|
+----+------------+------+------+------+

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: integer (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)

 

Salah satu kegunaan skema adalah untuk membaca file csv, terutama jika file tersebut tidak mengandung informasi header

df = spark.read.format("csv").load("mhs.csv", schema=mySchema)
df.show()
df.printSchema()
+-----+------------+------+------+------+
| nama|kode_jurusan|nilai1|nilai2|nilai3|
+-----+------------+------+------+------+
| Agus|           F|   100|   150|   150|
|Windy|           F|   200|   150|   180|
| Budi|           B|   200|   100|   150|
| Dina|           F|   150|   150|   130|
| Bayu|           F|    50|   150|   100|
| Dedi|           B|    50|   100|   100|
+-----+------------+------+------+------+

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: integer (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)

 

Fungsi printSchema() pada DataFrame sebenarnya menampilkan StructType dalam bentuk tree. Kita dapat menampilkan definisi skema sebuah DataFrame dengan mengakses property schema

df_infer.schema

 

Definisi Skema Dengan DDL (Data Definition Language)

Selain menggunakan StructType, kita dapat mendefinisikan skema menggunakan DDL string, yaitu seperti yang biasa digunakan pada SQL statement CREATE TABLE atau CREATE VIEW. 

String DDL yang dapat digunakan untuk membuat Skema hanya perlu mencantumkan nama kolom dan tipe : "namakolom_1 tipekolom_1, namakolom_2  tipekolom_2, … namakolom_N tipekolom_N"

DDL string dapat langsung digunakan untuk mendefinisikan skema dataframe.
 

ddl_string = "nama STRING, kode_jurusan STRING, \
              nilai1 INT, nilai2 INT, nilai3 INT"


df = spark.read.format("csv") \
               .schema(ddl_string) \
               .load("mhs.csv")
df.show()
df.printSchema()
ddl_string = "nama STRING, kode_jurusan STRING, \
              nilai1 INT, nilai2 INT, nilai3 INT"


df = spark.read.format("csv") \
               .schema(ddl_string) \
               .load("mhs.csv")
df.show()
df.printSchema()

 

Update Skema Dataframe Dengan withColumn

Seringkali kita perlu melakukan perubahan terhadap skema dataframe yang sudah kita buat, misalnya setelah proses data cleansing, atau ketika mempersiapkan data training untuk proses machine learning. 

Untuk mengubah struktur atau skema sebuah DataFrame yang sudah dibuat, gunakan fungsi DataFrame.withColumn(). Untuk mengubah atau mengkonversi tipe data, gunakan fungsi Column.cast(ObjectType)

Misalnya kita ingin mengubah tipe kolom nilai1 dari integer menjadi float, maka kita lakukan type casting menggunakan cast(FloatType)

df_dec = df.withColumn("nilai1", df["nilai1"].cast(DecimalType()))
df_dec.printSchema()
root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: decimal(10,0) (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)

 

Jika akan mengubah lebih dari satu kolom, kita bisa gunakan fungsi withColumns()

df_dec3 = df.withColumns({"nilai1": df["nilai1"].cast(DecimalType()),
                "nilai2": df["nilai2"].cast(DecimalType()),
                "nilai3": df["nilai3"].cast(DecimalType())})
df_dec3.printSchema()
df_dec3 = df.withColumns({"nilai1": df["nilai1"].cast(DecimalType()),
                "nilai2": df["nilai2"].cast(DecimalType()),
                "nilai3": df["nilai3"].cast(DecimalType())})
df_dec3.printSchema()

 

Menyimpan Skema Ke File JSON

Seringkali skema yang kita gunakan cukup kompleks dan sering berubah. Untuk memudahkan kita mengelola perubahan ini, kita bisa menyimpan skema ke dalam file berformat JSON.
Jika terjadi perubahan skema, kita tidak perlu mengubah kode program, akan tetapi cukup mengubah file JSON saja.

Cara menampilkan skema DataFrame dalam bentuk string JSON adalah dengan menggunakan fungsi DataFrame.schema.json() seperti berikut ini

json_string = df.schema.json()
print(json_string)
{"fields":[{"metadata":{},"name":"nama","nullable":true,"type":"string"},{"metadata":{},"name":"kode_jurusan","nullable":true,"type":"string"},{"metadata":{},"name":"nilai1","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai2","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai3","nullable":true,"type":"integer"}],"type":"struct"}

 

Kemudian kita dapat menuliskan string JSON tersebut ke dalam file seperti berikut ini

text_file = open("schema.json", "w")
text_file.write(json_string)
text_file.close()

 

Membaca Skema Dari File JSON 

Untuk men-generate skema dari file JSON, pertama-tama kita baca file dengan menggunakan fungsi load() dari package json. Hasil loading file adalah object bertipe dictionary

import json

f = open("schema.json")
json_dict = json.load(f)
f.close()

json_dict
{'fields': [{'metadata': {},
   'name': 'nama',
   'nullable': True,
   'type': 'string'},
  {'metadata': {}, 'name': 'kode_jurusan', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'nilai1', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'nilai2', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'nilai3', 'nullable': True, 'type': 'integer'}],
 'type': 'struct'}

 

Selanjutnya kita dapat menggunakan dictionary tersebut untuk membentuk StructType dengan menggunakan fungsi StructType.fromJson(json_dictionary)

data = [['Agus','F',100,150,150],
        ['Budi','B',200,100,150],
        ['Dina','F',150,150,130],
        ['Dedi','B',200,100,100]]


schemaFromJson = StructType.fromJson(json_dict)


df3 = spark.createDataFrame(data, schemaFromJson)
df3.show()
df3.printSchema()
+----+------------+------+------+------+
|nama|kode_jurusan|nilai1|nilai2|nilai3|
+----+------------+------+------+------+
|Agus|           F|   100|   150|   150|
|Budi|           B|   200|   100|   150|
|Dina|           F|   150|   150|   130|
|Dedi|           B|   200|   100|   100|
+----+------------+------+------+------+

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: integer (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)

 

Wrapping Up

Kita telah belajar PySpark cara untuk mendefinisikan skema pada DataFrame. Tutorial PySpark diatas telah dipelajar mengenai penggunaan

  • paramater inferSchema
  • fungsi StructType()
  • penggunaan DDL string
  • fungsi withColumn()

 

Notebook untuk tutorial ini dapat diakses di sini

 

Artikel seri Belajar PySpark sebelumnya