Artikel ini merupakan seri Belajar PySpark yang merupakan kelanjutan dari artikel-artikel sebelumnya. Tutorial PySpark kali ini akan membahas mengenai Skema pada Spark DataFrame. Skema merupakan definisi dari struktur dari DataFrame, yang memastikan konsistensi dan kejelasan dalam interpretasi data.
Yuk langsung saja kita belajar PySpark untuk mendefinisikan skema DataFrame.
Dalam Apache Spark, skema DataFrame merupakan hal yang penting, karena merupakan acuan struktur, nama, dan tipe data dari kolom-kolom dalam dataframe. Beberapa manfaat penggunaan skema:
- Integritas Data dan type-safety : dengan skema, kita memastikan bahwa data dalam DataFrame sesuai dengan struktur dan tipe data tertentu. Hal ini membantu mencegah ketidakcocokan tipe data dan meningkatkan integritas data.
- Optimasi kinerja: dengan skema, Spark dapat menyimpan data secara lebih efisien dalam memori karena mengetahui tipe dan struktur data yang tepat. Hal ini dapat menghasilkan peningkatan kinerja yang signifikan. Penggunaan skema pre-defined juga mengurangi kerja Spark pada saat melakukan loading data.
- Optimasi kueri: Catalyst Optimizer, komponen Spark yang melakukan optimasi proses, dapat menggunakan informasi skema untuk melakukan predikat pushdown, me-resolve referensi kolom pada waktu kompilasi, dan menerapkan berbagai optimasi kueri.
Ada beberapa cara untuk mendefinisikan skema dalam Spark DataFrame, diantaranya yaitu:
- Inferensi skema dari data. Spark dapat menyimpulkan skema DataFrame secara otomatis ketika membaca data dari sumber eksternal (misalnya file CSV, JSON, Parquet).
- Mendefinisikan skema secara eksplisit menggunakan StructType dan StructField dari modul pyspark.sql.types
- Menggunakan DDL (Data Definition Language) seperti CREATE TABLE
- Mengubah struktur kolom DataFrame yang sudah ada dengan menggunakan type casting dan withColumn
Sebelumnya kita import dulu package yang diperlukan, dan kita inisialisasi spark session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Belajar PySpark - Skema DataFrame").getOrCreate()
Inferensi Skema Dari Data
Spark dapat membaca skema dari sumber data yang dimuat ke dataframe, misalnya file CSV, JSON, Parquet, dll. Cara ini biasanya digunakan pada tahap eksplorasi data, dan jika data yang dibaca ukurannya tidak terlalu besar.
Untuk melakukan inferensi skema dari eksternal data, set parameter inferSchema = True pada saat loading
df_infer = spark.read.csv("mhs_header.csv", header=True, inferSchema=True)
df_infer.printSchema()
df_infer.show()
root
|-- nama: string (nullable = true)
|-- kode_jurusan: string (nullable = true)
|-- nilai1: integer (nullable = true)
|-- nilai2: integer (nullable = true)
|-- nilai3: integer (nullable = true)
+-----+------------+------+------+------+
| nama|kode_jurusan|nilai1|nilai2|nilai3|
+-----+------------+------+------+------+
| Agus| F| 100| 150| 150|
|Windy| F| 200| 150| 180|
| Budi| B| 200| 100| 150|
| Dina| F| 150| 150| 130|
| Bayu| F| 50| 150| 100|
| Dedi| B| 50| 100| 100|
+-----+------------+------+------+------+
Ketika kita membentuk DataFrame dari object python, misalnya list, dan tidak men-supply skema, maka Spark akan secara otomatis menyimpulkan (melakukan inferensi) skema sesuai dengan data
data = [['Agus','F',100,150,150],
['Budi','B',200,100,150],
['Dina','F',150,150,130],
['Dedi','B', 50,100,100]]
df = spark.createDataFrame(data, kolom)
df.show()
df.printSchema()
+----+---+---+---+---+
| _1| _2| _3| _4| _5|
+----+---+---+---+---+
|Agus| F|100|150|150|
|Budi| B|200|100|150|
|Dina| F|150|150|130|
|Dedi| B| 50|100|100|
+----+---+---+---+---+
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: long (nullable = true)
|-- _4: long (nullable = true)
|-- _5: long (nullable = true)
Karena kita tidak mensupply informasi nama kolom, maka Spark men-generate nama kolom dengan format _1, _2, …, _N
Definisi Skema Dengan Structtype Dan Structfield
Berbeda dengan tahap eksplorasi data, lingkungan produksi biasanya memerlukan definisi skema yang lebih strict. Definisi skema juga bisa digunakan untuk membaca file csv yang tidak mengandung header.
Untuk mendefinisikan skema, kita perlu melakukan import kelas-kelas yang diperlukan, seperti StructType, StructField, StringType, dll. dari package pyspark.sql.types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
Untuk mendefinisikan skema, kita membuat object bertipe StructType, yang merupakan kumpulan dari object bertipe StructField.
StructField merupakan object yang mendefinisikan nama kolom, tipe data, dan nullability (apakah kolom boleh bernilai NULL).
Untuk mendefinisikan skema, sintaks yang digunakan adalah
StructType([ StructField(namafield_1, Type, nullability),
StructField(namafield_2, Type, nullability),
…
StructField(namafield_N, Type, nullability)
])
Contoh penggunaannya seperti berikut
data = [['Agus','F',100,150,150],
['Budi','B',200,100,150],
['Dina','F',150,150,130],
['Dedi','B', 50,100,100]]
mySchema = StructType([ \
StructField('nama', StringType(), True), \
StructField('kode_jurusan', StringType(), True), \
StructField('nilai1', IntegerType(), True), \
StructField('nilai2', IntegerType(), True), \
StructField('nilai3', IntegerType(), True) \
])
df = spark.createDataFrame(data,mySchema)
df.show()
df.printSchema()
+----+------------+------+------+------+
|nama|kode_jurusan|nilai1|nilai2|nilai3|
+----+------------+------+------+------+
|Agus| F| 10| 15| 150|
|Budi| B| 20| 10| 150|
|Dina| F| 15| 15| 130|
|Dedi| B| 5| 10| 100|
+----+------------+------+------+------+
root
|-- nama: string (nullable = true)
|-- kode_jurusan: string (nullable = true)
|-- nilai1: integer (nullable = true)
|-- nilai2: integer (nullable = true)
|-- nilai3: integer (nullable = true)
Salah satu kegunaan skema adalah untuk membaca file csv, terutama jika file tersebut tidak mengandung informasi header
df = spark.read.format("csv").load("mhs.csv", schema=mySchema)
df.show()
df.printSchema()
+-----+------------+------+------+------+
| nama|kode_jurusan|nilai1|nilai2|nilai3|
+-----+------------+------+------+------+
| Agus| F| 100| 150| 150|
|Windy| F| 200| 150| 180|
| Budi| B| 200| 100| 150|
| Dina| F| 150| 150| 130|
| Bayu| F| 50| 150| 100|
| Dedi| B| 50| 100| 100|
+-----+------------+------+------+------+
root
|-- nama: string (nullable = true)
|-- kode_jurusan: string (nullable = true)
|-- nilai1: integer (nullable = true)
|-- nilai2: integer (nullable = true)
|-- nilai3: integer (nullable = true)
Fungsi printSchema() pada DataFrame sebenarnya menampilkan StructType dalam bentuk tree. Kita dapat menampilkan definisi skema sebuah DataFrame dengan mengakses property schema
df_infer.schema
Definisi Skema Dengan DDL (Data Definition Language)
Selain menggunakan StructType, kita dapat mendefinisikan skema menggunakan DDL string, yaitu seperti yang biasa digunakan pada SQL statement CREATE TABLE atau CREATE VIEW.
String DDL yang dapat digunakan untuk membuat Skema hanya perlu mencantumkan nama kolom dan tipe : "namakolom_1 tipekolom_1, namakolom_2 tipekolom_2, … namakolom_N tipekolom_N"
DDL string dapat langsung digunakan untuk mendefinisikan skema dataframe.
ddl_string = "nama STRING, kode_jurusan STRING, \
nilai1 INT, nilai2 INT, nilai3 INT"
df = spark.read.format("csv") \
.schema(ddl_string) \
.load("mhs.csv")
df.show()
df.printSchema()
ddl_string = "nama STRING, kode_jurusan STRING, \
nilai1 INT, nilai2 INT, nilai3 INT"
df = spark.read.format("csv") \
.schema(ddl_string) \
.load("mhs.csv")
df.show()
df.printSchema()
Update Skema Dataframe Dengan withColumn
Seringkali kita perlu melakukan perubahan terhadap skema dataframe yang sudah kita buat, misalnya setelah proses data cleansing, atau ketika mempersiapkan data training untuk proses machine learning.
Untuk mengubah struktur atau skema sebuah DataFrame yang sudah dibuat, gunakan fungsi DataFrame.withColumn(). Untuk mengubah atau mengkonversi tipe data, gunakan fungsi Column.cast(ObjectType)
Misalnya kita ingin mengubah tipe kolom nilai1 dari integer menjadi float, maka kita lakukan type casting menggunakan cast(FloatType)
df_dec = df.withColumn("nilai1", df["nilai1"].cast(DecimalType()))
df_dec.printSchema()
root
|-- nama: string (nullable = true)
|-- kode_jurusan: string (nullable = true)
|-- nilai1: decimal(10,0) (nullable = true)
|-- nilai2: integer (nullable = true)
|-- nilai3: integer (nullable = true)
Jika akan mengubah lebih dari satu kolom, kita bisa gunakan fungsi withColumns()
df_dec3 = df.withColumns({"nilai1": df["nilai1"].cast(DecimalType()),
"nilai2": df["nilai2"].cast(DecimalType()),
"nilai3": df["nilai3"].cast(DecimalType())})
df_dec3.printSchema()
df_dec3 = df.withColumns({"nilai1": df["nilai1"].cast(DecimalType()),
"nilai2": df["nilai2"].cast(DecimalType()),
"nilai3": df["nilai3"].cast(DecimalType())})
df_dec3.printSchema()
Menyimpan Skema Ke File JSON
Seringkali skema yang kita gunakan cukup kompleks dan sering berubah. Untuk memudahkan kita mengelola perubahan ini, kita bisa menyimpan skema ke dalam file berformat JSON.
Jika terjadi perubahan skema, kita tidak perlu mengubah kode program, akan tetapi cukup mengubah file JSON saja.
Cara menampilkan skema DataFrame dalam bentuk string JSON adalah dengan menggunakan fungsi DataFrame.schema.json() seperti berikut ini
json_string = df.schema.json()
print(json_string)
{"fields":[{"metadata":{},"name":"nama","nullable":true,"type":"string"},{"metadata":{},"name":"kode_jurusan","nullable":true,"type":"string"},{"metadata":{},"name":"nilai1","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai2","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai3","nullable":true,"type":"integer"}],"type":"struct"}
Kemudian kita dapat menuliskan string JSON tersebut ke dalam file seperti berikut ini
text_file = open("schema.json", "w")
text_file.write(json_string)
text_file.close()
Membaca Skema Dari File JSON
Untuk men-generate skema dari file JSON, pertama-tama kita baca file dengan menggunakan fungsi load() dari package json. Hasil loading file adalah object bertipe dictionary
import json
f = open("schema.json")
json_dict = json.load(f)
f.close()
json_dict
{'fields': [{'metadata': {},
'name': 'nama',
'nullable': True,
'type': 'string'},
{'metadata': {}, 'name': 'kode_jurusan', 'nullable': True, 'type': 'string'},
{'metadata': {}, 'name': 'nilai1', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'nilai2', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'nilai3', 'nullable': True, 'type': 'integer'}],
'type': 'struct'}
Selanjutnya kita dapat menggunakan dictionary tersebut untuk membentuk StructType dengan menggunakan fungsi StructType.fromJson(json_dictionary)
data = [['Agus','F',100,150,150],
['Budi','B',200,100,150],
['Dina','F',150,150,130],
['Dedi','B',200,100,100]]
schemaFromJson = StructType.fromJson(json_dict)
df3 = spark.createDataFrame(data, schemaFromJson)
df3.show()
df3.printSchema()
+----+------------+------+------+------+
|nama|kode_jurusan|nilai1|nilai2|nilai3|
+----+------------+------+------+------+
|Agus| F| 100| 150| 150|
|Budi| B| 200| 100| 150|
|Dina| F| 150| 150| 130|
|Dedi| B| 200| 100| 100|
+----+------------+------+------+------+
root
|-- nama: string (nullable = true)
|-- kode_jurusan: string (nullable = true)
|-- nilai1: integer (nullable = true)
|-- nilai2: integer (nullable = true)
|-- nilai3: integer (nullable = true)
Wrapping Up
Kita telah belajar PySpark cara untuk mendefinisikan skema pada DataFrame. Tutorial PySpark diatas telah dipelajar mengenai penggunaan
- paramater inferSchema
- fungsi StructType()
- penggunaan DDL string
- fungsi withColumn()
Notebook untuk tutorial ini dapat diakses di sini
Artikel seri Belajar PySpark sebelumnya
- Membaca File JSON
- Membaca File csv ke DataFrame
- Memproses Dataframe dengan Temporary View dan SQL
- Join DataFrame
- Dataframe GroupBy dan Agregasi
- Select, Filter, dan Where pada DataFrame