
Setelah sebelumnya kita mempelajari bagaimana membaca file csv ke DataFrame, maka dalam tutorial ini kita akan membahas mengenai bagaimana menyimpan DataFrame ke dalam file csv dengan object DataFrameWriter, beserta parameter yang dapat digunakan.
Dalam pyspark, ada beberapa cara untuk menulis data ke dalam csv file, yaitu :
- Menggunakan fungsi write.csv()
- Menggunakan fungsi write.format()
Keduanya menggunakan object pyspark.sql.DataFrameWriter yang terdapat dalam object DataFrame. Object ini diakses menggunakan interface DataFrame.write
Sebelumnya kita persiapkan terlebih dahulu environment dan data yang akan digunakan
Jika menggunakan google colab, maka install library pyspark dengan perintah pip.
%pip install pyspark
Import package yang akan digunakan dan inisialisasi SparkSession
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("Belajar PySpark - Menulis file csv").getOrCreate()
Persiapkan DataFrame yang akan kita gunakan
data = [['Agus','Fisika','Umum',150],['Windy','Fisika','Khusus',200],
['Budi','Biologi','Umum',170],['Dina','Fisika','Khusus',180],
['Bayu','Fisika','Umum',160],['Dedi','Biologi','Khusus',185]]
kolom = ["nama","jurusan","jalur","nilai"]
df = spark.createDataFrame(data, kolom)
df.show()
Output yang dihasilkan: +-----+-------+------+-----+ | nama|jurusan| jalur|nilai| +-----+-------+------+-----+ | Agus| Fisika| Umum| 150| |Windy| Fisika|Khusus| 200| | Budi|Biologi| Umum| 170| | Dina| Fisika|Khusus| 180| | Bayu| Fisika| Umum| 160| | Dedi|Biologi|Khusus| 185| +-----+-------+------+-----+
Menggunakan fungsi write.csv()
Berbeda dengan Pandas yang menyimpan DataFrame ke dalam sebuah file csv, pyspark menyimpan DataFrame ke dalam beberapa file csv di dalam sebuah direktori.
Menyimpan file csv tanpa header
Ketika menggunakan fungsi df.write.csv(), maka semua opsi dapat dikirim sebagai parameter dalam fungsi tersebut. Salah satunya adalah parameter nama atau path direktori di mana file akan disimpan. Misalnya DataFrame dalam contoh di atas akan disimpan di direktori "mahasiswa"
df.write.csv("mahasiswa")
Dalam direktori tersebut akan dibentuk file-file dengan jumlah sesuai dengan jumlah partisi DataFrame yang bersangkutan. Kita bisa menampilkan jumlah partisi dalam sebuah DataFrame dengan perintah DataFrame.rdd.getNumPartitions().
df.rdd.getNumPartitions()
Output : 2
Jika kita list outputnya, akan tampak file-file dengan jumlah yang sama dengan jumlah partisi
!ls -l mahasiswa
total 8 -rw-r--r-- 1 root root 67 Feb 26 07:15 part-00000-df143a52-762b-477a-b09d-de93827bcf2e-c000.csv -rw-r--r-- 1 root root 68 Feb 26 07:15 part-00001-df143a52-762b-477a-b09d-de93827bcf2e-c000.csv -rw-r--r-- 1 root root 0 Feb 26 07:15 _SUCCESS
Kita bisa melihat isinya dengan perintah cat atau head. Jika data yang disimpan kecil, kita bisa menampilkan semuanya dengan perintah cat *
!cat mahasiswa/*
Output : Agus,Fisika,Umum,150 Windy,Fisika,Khusus,200 Budi,Biologi,Umum,170 Dina,Fisika,Khusus,180 Bayu,Fisika,Umum,160 Dedi,Biologi,Khusus,185
Menyimpan file csv dengan header
Secara default file csv akan disimpan tanpa header atau baris pertama yang berisi nama kolom. Jika kita ingin menyimpan file csv dengan header, maka kita set parameter header = True.
df.write.csv("mahasiswa_header", header=True)
Perhatikan bahwa karena pyspark membentuk lebih dari satu file csv, maka masing-masing file tersebut akan disimpan dengan header.
!ls -l mahasiswa_header
total 8 -rw-r--r-- 1 root root 92 Feb 26 13:55 part-00000-941fd598-218d-4dfd-81bd-e7844b796ace-c000.csv -rw-r--r-- 1 root root 93 Feb 26 13:55 part-00001-941fd598-218d-4dfd-81bd-e7844b796ace-c000.csv -rw-r--r-- 1 root root 0 Feb 26 13:55 _SUCCESS
Sehingga jika kita concat/gabungkan semua hasilnya, akan muncul multiple header seperti berikut ini
!cat mahasiswa_header/*
Output : nama,jurusan,jalur,nilai Agus,Fisika,Umum,150 Windy,Fisika,Khusus,200 Budi,Biologi,Umum,170 nama,jurusan,jalur,nilai Dina,Fisika,Khusus,180 Bayu,Fisika,Umum,160 Dedi,Biologi,Khusus,185
Menyimpan file csv dengan delimiter selain koma
Ada kalanya kita perlu menyimpan DataFrame ke dalam file csv dengan delimiter selain koma. Misalnya jika dalam data kita terdapat kolom bertipe string yang mengandung karakter koma. Kita bisa memilih karakter yang dianggap ‘aman’, seperti misalnya | (pipe).
Untuk menyimpan file dengan delimiter ‘|’, kita gunakan parameter separator="|" atau sep="|".
df.write.csv("mahasiswa_delim", sep="|")
Hasilnya berupa file teks dengan delimiter '|'
!cat mahasiswa_delim/*
Output : Agus|Fisika|Umum|150 Windy|Fisika|Khusus|200 Budi|Biologi|Umum|170 Dina|Fisika|Khusus|180 Bayu|Fisika|Umum|160 Dedi|Biologi|Khusus|185
Menggunakan fungsi write.option()
Selain mengirimkan parameter melalui fungsi write.csv(), kita juga dapat menggunakan fungsi write.option(), dengan sintaks option(“param_name”,value)
Untuk lebih dari 1 parameter, gunakan rangkaian fungsi option seperti berikut ini :
df.write.option(“param_name1”,value1).option(“param_name2”,value2).csv(“nama_direktori”)
Perlu diingat bahwa rangkaian fungsi option() harus muncul di depan fungsi csv(). Misalnya jika kita ingin menyimpan DataFrame dengan header dan delimiter = “|”
df.write.option("sep","|") \
.option("header", True) \
.csv("mahasiswa_delim")
Menggunakan fungsi write.format()
Selain fungsi write.csv(), kita juga dapat menggunakan fungsi write.format(“csv”). Fungsi ini digunakan bersama dengan fungsi write.option() untuk menentukan parameternya, dan write.save(“dir_name”) untuk menentukan path dan nama direktorinya.
df.write.format("csv").save("mahasiswa_format")
Untuk menyimpan dengan header dan delimiter “|”, kita gunakan fungsi write.option(). Perlu diperhatikan bahwa rangkaian option() harus diletakkan sebelum save()
df.write.format("csv") \
.option("header",True) \
.option("sep","|") \
.save("mahasiswa_format_delim")
!cat mahasiswa_format_delim/*
Output : nama|jurusan|jalur|nilai Agus|Fisika|Umum|150 Windy|Fisika|Khusus|200 Budi|Biologi|Umum|170 nama|jurusan|jalur|nilai Dina|Fisika|Khusus|180 Bayu|Fisika|Umum|160 Dedi|Biologi|Khusus|185
Mode penulisan file csv : error, append, overwrite, ignore
Ada beberapa pilihan mode penulisan DataFrame ke file, yaitu :
- error : mengembalikan pesan error jika direktori sudah ada sebelumnya
- append : membuat file baru ke dalam direktori yang sudah ada (atau ke direktori baru, jika belum ada)
- overwrite : jika direktori sudah ada, file lama akan dihapus dan diganti dengan data baru
- ignore : jika direktori sudah ada, tidak dilakukan apa-apa. Jika belum, tulis data
Secara default pyspark menggunakan mode error. Untuk menggunakan mode yang lain, gunakan parameter mode.
Misalnya kita akan menulis data berikut ini ke direktori-direktori yang kita tulis di contoh sebelumnya
data_new = [['Citra','Fisika','Umum',170],
['Jaka','Biologi','Khusus',180]]
kolom = ["nama","jurusan","jalur","nilai"]
df_new = spark.createDataFrame(data_new, kolom)
df_new.show()
Output : +-----+-------+------+-----+ | nama|jurusan| jalur|nilai| +-----+-------+------+-----+ |Citra| Fisika| Umum| 170| | Jaka|Biologi|Khusus| 180| +-----+-------+------+-----+
Menggunakan mode append dan fungsi write.csv
Jika menggunakan fungsi write.csv, kita dapat mengirimkan mode sebagai salah satu parameter dalam fungsi.
df_new.write.csv("mahasiswa_header", header=True, mode="append")
Terlihat bahwa pyspark membuat 2 file baru ke dalam direktori mahasiswa_header yang sebelumnya sudah ada.
!ls -l mahasiswa_header
total 16
-rw-r--r-- 1 root root 47 Feb 26 14:36 part-00000-5516c025-30a3-4228-9bd9-509da5a5e664-c000.csv
-rw-r--r-- 1 root root 92 Feb 26 13:55 part-00000-941fd598-218d-4dfd-81bd-e7844b796ace-c000.csv
-rw-r--r-- 1 root root 49 Feb 26 14:36 part-00001-5516c025-30a3-4228-9bd9-509da5a5e664-c000.csv
-rw-r--r-- 1 root root 93 Feb 26 13:55 part-00001-941fd598-218d-4dfd-81bd-e7844b796ace-c000.csv
-rw-r--r-- 1 root root 0 Feb 26 14:36 _SUCCESS
Datanya merupakan gabungan data yang lama dan data baru. Karena keduanya kita simpan dengan header=True, maka terdapat 4 header dalam gabungan outputnya, sesuai dengan jumlah file yang terbentuk.
!cat mahasiswa_header/*
Output : nama,jurusan,jalur,nilai Citra,Fisika,Umum,170 nama,jurusan,jalur,nilai Agus,Fisika,Umum,150 Windy,Fisika,Khusus,200 Budi,Biologi,Umum,170 nama,jurusan,jalur,nilai Jaka,Biologi,Khusus,180 nama,jurusan,jalur,nilai Dina,Fisika,Khusus,180 Bayu,Fisika,Umum,160 Dedi,Biologi,Khusus,185
Menggunakan mode overwrite dan fungsi write.format
Pada mode overwrite, terlihat bahwa file lama akan dihapus dan digantikan dengan file yang baru :
df_new.write.format("csv") \
.option("header",True) \
.mode("overwrite") \
.save("mahasiswa_header")
File yang terbentuk hanya 2, yaitu file dari DataFrame yang baru. Sedangkan data yang lama dihapus.
!ls -l mahasiswa_header
total 8 -rw-r--r-- 1 root root 47 Feb 26 14:41 part-00000-10bbca3c-38d7-42c2-8ba2-a244d01b82fe-c000.csv -rw-r--r-- 1 root root 49 Feb 26 14:41 part-00001-10bbca3c-38d7-42c2-8ba2-a244d01b82fe-c000.csv -rw-r--r-- 1 root root 0 Feb 26 14:41 _SUCCESS
Kontennya adalah
!cat mahasiswa_header/*
Output : nama,jurusan,jalur,nilai Citra,Fisika,Umum,170 nama,jurusan,jalur,nilai Jaka,Biologi,Khusus,180
Menyimpan dengan kompresi
Ketika kita memproses data teks berukuran besar, seringkali akan lebih efisien jika file tersebut disimpan secara terkompresi. Untuk menyimpan file secara terkompresi, kita gunakan parameter compression. Pyspark mendukung berbagai metode kompresi, di antaranya yaitu bzip2, gzip, lz4, dan snappy.
df.write.csv("mahasiswa_gzip", header=True, compression="gzip")
Hasilnya berupa file-file csv yang terkompresi, sejumlah partisi DataFrame
!ls -ls mahasiswa_gzip
Output: total 8 4 -rw-r--r-- 1 root root 99 Feb 27 03:45 part-00000-333b118a-afcb-4480-bde1-f595afc1bc64-c000.csv.gz 4 -rw-r--r-- 1 root root 98 Feb 27 03:45 part-00001-333b118a-afcb-4480-bde1-f595afc1bc64-c000.csv.gz 0 -rw-r--r-- 1 root root 0 Feb 27 03:45 _SUCCESS
Menyimpan DataFrame ke dalam 1 file csv
Jika DataFrame berukuran kecil, kita mungkin ingin menyimpannya ke dalam satu file csv agar informasi headernya tidak berulang.
Untuk menghindari spark memecah file, kita dapat menyatukan terlebih dahulu DataFrame ke dalam 1 partisi menggunakan fungsi coalesce(1), baru kemudian menuliskan ke dalam file.
df.coalesce(1) \
.write.format("csv") \
.option("header",True) \
.save("mahasiswa_coalesce")
Output file dalam direktori
!ls -l mahasiswa_coalesce
total 4 -rw-r--r-- 1 root root 160 Feb 27 03:45 part-00000-50a23307-3e38-4ba3-ac22-28b52b04f4f3-c000.csv -rw-r--r-- 1 root root 0 Feb 27 03:45 _SUCCESS Dengan cara ini, kita dapat menyimpan file beserta header tanpa terjadi duplikasi header.
!cat mahasiswa_coalesce/*
Output: nama,jurusan,jalur,nilai Agus,Fisika,Umum,150 Windy,Fisika,Khusus,200 Budi,Biologi,Umum,170 Dina,Fisika,Khusus,180 Bayu,Fisika,Umum,160 Dedi,Biologi,Khusus,185
Perlu diingat bahwa perintah coalesce(1) mengumpulkan seluruh data ke dalam satu partisi, sehingga hal ini sebaiknya dihindari jika data yang akan disimpan berukuran sangat besar.
Parameter lain dalam DataFrameWriter
Selain parameter yang sudah dibahas di atas, yaitu path, separator, header, mode, dan compression, ada beberapa parameter lain yang dapat digunakan dalam penyimpanan data di pyspark. Beberapa di antaranya yaitu :
- nullValue: menentukan representasi string dari nilai null dalam file output.
- escape: menentukan karakter escape yang akan digunakan saat menulis data dalam format teks
- quote: menentukan karakter quote yang akan digunakan saat menulis data teks
- dateFormat: menentukan format tanggal yang akan digunakan
- timestampFormat: menentukan format timestamp yang akan digunakan
Wrapping Up
Pyspark menyediakan berbagai opsi/parameter untuk menyimpan DataFrame ke dalam file csv. Parameter tersebut dapat digunakan untuk menentukan header, delimiter, kompresi, dan lain-lain.
Pyspark menulis file output ke dalam satu direktori, yang terdiri dari beberapa file, tergantung pada partisinya. Untuk menuliskan ke dalam 1 file, kita dapat menggabungkan DataFrame ke dalam satu partisi. Hal ini tidak disarankan untuk data yang berukuran besar.
Notebook untuk artikel ini dapat diakses di sini
Simak juga artikel-artikel sebelumnya