How we monitor thousands of Spark data pipelines (TH version)
สวัสดีครับ เราชื่ออาร์ท ตอนนี้เป็น Senior Data Platform Engineer อยู่ที่ LINE MAN Wongnai วันนี้เราจะมาเล่าว่าเรา monitor Spark pipeline กันยังไง แต่ก่อนอื่นขอแนะนำตำแหน่งตัวเองซักนิดนึง ….. ต้องเท้าความก่อนว่าเมื่อก่อน Data Engineer คือคนที่ดูแลทุกอย่างตั้งแต่ Data Pipeline, Data Infrastructure, Apache Airflow, Presto Cluster, Monitoring Tools, และอื่น ๆ อีกมากมาย ซึ่งมันเยอะมากจนทำให้การดูแลของเหล่านี้มันไม่ทั่วถึง ด้วยเหตุผลนี้บริษัทเลยเปิดตำแหน่งใหม่ขึ้นมาเป็น Data Platform Engineer ที่จะมาเข้ามาดู data platform โดยเฉพาะเลย หน้าที่หลักคือคล้าย ๆ Site Reliability Engineer แต่จะดูแค่ data platform and related services เท่านั้น และก็เราจะต้องทำยังไงก็ได้ให้ platform ไม่ล่ม น่าเชื่อถือ และทำ improvement ใน platform ให้ดียิ่งขึ้น วันนี้เราขอหยิบหนึ่งตัวอย่างของ improvement มาให้ดูกัน
Overview of Batch Data Pipelines
ขอเกริ่นถึงภาพรวมในปัจจุบันของ data pipeline ก่อน ตอนนี้เราเลือกใช้ Apache Airflow เป็น workflow orchestrator ตัวหลักในการช่วยทำ data pipeline ซึ่งตอนนี้เรามี Airflow ทั้งหมด 3 ตัว ได้แก่
- DAP Airflow เป็น Airflow ที่ Data Engineer กับ Data Scientist ใช้ทำ ETL pipeline จากฐานข้อมูลต่าง ๆ ในบริษัท ใช้ดึงข้อมูลจาก third party api ใช้ประมวลผลไฟล์จาก cloud object storage จาก GCP และ AWS รวมถึงการดึงข้อมูลจาก Google Sheets ด้วย นอกเหนือจากการทำ ETL จากข้อมูลภายนอก เรามี pipeline ที่ซัพพอร์ตการทำ precomputed table อีกด้วย (table ที่ aggregate ข้อมูลจากหลาย ๆ table เข้าด้วยกัน)
- DS Airflow เป็น Airflow ที่ Data Scientist ใช้ในการ process data เพื่องานของ data scientist โดยเฉพาะ เช่น การทำ feature engineering, model training & prediction, หรือการทำ analysis ทั่ว ๆ ไป
- DBT Airflow เป็น Airflow ที่ Analytics Engineer ใช้ในการทำ precomputed table โดยเฉพาะ โดยหลังบ้านจะใช้ DBT (Data Build Tool) ในการช่วยจัดการ data modeling ทั้งหมด
จาก Airflow ทั้ง 3 ตัวนี้ เรามี DAG (Direct Acyclic Graph) ที่ใช้ Apache Spark ในการเข้ามาช่วยประมวลผลข้อมูลเป็นจำนวนทั้งสิ้น 83.9% จากจำนวน DAG ทั้งหมด หรือตีเป็น 386 DAGs ถ้านับเป็นจำนวน Task Instance ก็จะได้ทั้งหมด 2,560 Task Instances นั่นเอง พูดง่าย ๆ คือเรามี data pipelines ที่ทำงานด้วย Apache Spark ทั้งหมด 2,560 pipelines นั่นเอง
So, What’s the problem (s) ?
ดังนั้นเมื่อจำนวน pipeline มันมีมากขนาดนี้ การดูแล pipeline มันก็เริ่มยากขึ้นด้วย และถ้าเราดูแลไม่ดีหรือ take action ไม่ไวหรือไม่มีวิธีการ monitor มันก็กระทบไปถึงฝั่ง business ได้เลยในเวลาแค่ไม่กี่ชั่วโมง ตัวอย่างเช่น เรามี pipeline ที่เริ่มทำงานช่วงเที่ยงคืนเวลาไทย แล้วถ้ามี pipeline ซัก 1 ตัวทำงานช้ากว่าปกติหรือมีพังด้วยสาเหตุต่าง ๆ และมีอีกหลาย ๆ downstream pipeline ที่มารอข้อมูลจาก pipeline ตัวนี้เพื่อนำไปคำนวณต่อ ผลที่จะเกิดขึ้นก็คือ ข้อมูลจะถูกดีเลย์หรือไม่ก็ข้อมูลผิดไปเลย ซึ่งมันก็จะส่งผลกับฝั่ง business ที่แต่ละทีมก็จะ take action ได้ช้าหรือตัดสินใจผิดเพราะข้อมูลไม่ครบก็ได้
ดังนั้นเรามาดูภาพปัจจุบันกันต่อว่าสิ่งที่เกิดขึ้นใน Data Pipeline ระหว่าง Airflow กับ Spark มีอะไรเกิดขึ้นบ้าง โดยการทำงานคร่าว ๆ มีดังนี้
- Airflow ส่ง job เข้าไปหา Spark cluster ด้วย spark-submit cli หรือ Livy REST API
- Airflow รอ Spark รันไปเรื่อย ๆ
- พอ Spark รันเสร็จ ก็จะเก็บ log ต่าง ๆ กลับมา แต่ถ้าไม่เสร็จก็จะเก็บ error กลับมาแทน
จาก flow นี้ ปัญหาจริง ๆ ที่เกิดขึ้นคือเราขาด Pipeline Observability เราไม่สามารถรับรู้ได้เลยว่า Spark pipeline ต่าง ๆ ทำงานดีไหม หรือแค่ทำงานได้ สิ่งที่เราจะรับรู้จาก Airflow จะมีแค่ error ที่ส่งเข้ามาใน Slack ว่า pipeline พัง แบบในรูปด้านล่าง พอปัญหาแบบนี้เกิดขึ้นปุ๊บ เราก็จะเข้าไปแก้ไขบัคหรือปรับ Spark Configuration ใน pipeline ให้มันทำงานต่อได้ แต่เราก็ยังไม่สามารถรับรู้ได้อยู่ดีว่า มี pipeline ไหนที่ใกล้จะมีปัญหาอีก หรือมี pipeline ไหนที่เราสามารถปรับปรุงเพื่อให้มันทำงานได้แบบไม่ต้องไปเสี่ยงให้มันเกิดปัญหาอีก หรือทำให้มันเร็วขึ้นกว่าเดิมได้ไหมเพื่อที่จะลดความเสี่ยงที่จะเกิดข้อมูลดีเลย์
ซึ่งการจะเข้าไปปรับปรุงให้ pipeline มันทำงานเร็วขึ้น เราจะต้องเข้าไปดูที่ Spark Web UI ของแต่ละ pipeline เท่านั้น และตัว Spark Web UI เองก็มี retention ถึงแค่ 14 วัน และอย่างที่ได้เล่ามาว่าเรามี pipeline จำนวนถึงหลักพัน ดังนั้นการจะเข้าไปหาว่า pipeline ไหนควรได้รับการปรับปรุงบ้างจึงเป็นเรื่องที่ใช้เวลาเยอะมาก
ซึ่งปัญหาหลัก ๆ ที่เราพบบน Spark pipeline โดนส่วนมากมีดังนี้ ได้แก่
Data Skew — ในเมื่อการทำงานของ Apache Spark มันทำงานกันเป็น cluster มี worker หลาย ๆ ตัวช่วยกันประมวลผลพร้อม ๆ กัน ถ้า worker ตัวใดตัวหนึ่งใช้เวลาทำงานเยอะกว่าเพื่อน แปลว่าเราอาจจะกำลังเจอปัญหา Data Skew เข้าให้แล้ว ดังตัวอย่างในรูป เรามี task ทั้งหมด 200 tasks ซึ่ง 199 tasks อาจจะทำงานเสร็จไปแล้วภายใน 10 นาที แต่มีอีก 1 task ที่ยังทำงานอยู่แล้วใช้เวลาไปแล้ว 47 ชั่วโมง
Data Spill — เป็นปัญหาที่จะเกิดขึ้นเมื่อ execution memory ของ Spark worker ไม่พอ จนต้องใช้ memory ส่วนอื่น ๆ หรือถ้าไม่พออีก มันก็จะไปใช้ disk ในการเก็บ data แทน ซึ่งมันจะทำให้เสียเวลามากขึ้นในการที่ Spark จะไปเขียน/อ่านข้อมูลที่ disk อย่างที่เรารู้กันว่า RAM จะทำงานเร็วกว่า disk เสมอ ดังนั้นนี่จึงเป็นอีกหนึ่งปัญหาที่อาจจะทำให้ pipeline เราทำงานช้าลง
Over/Under Resource Utilization — พอเรามี job จำนวนมาก เราเองไม่มีทางรู้ได้เลยว่า job ไหนขอ CPU core กับ memory เยอะไปหรือน้อยไป บาง job ขอ CPU ไปเยอะมาก แต่จริง ๆ แล้วอาจจะใช้แค่ 10 CPU cores ก็เพียงพอต่อการรันงานให้จบภายในเวลาที่กำหนดแล้ว ซึ่งเราจะรู้ก็ต่อเมื่อไปดูในโค้ดของเรา หรือ Spark Web UI อีกทั้งการขอ resource เยอะมาก ๆ มันก็ส่งผลกับ capacity ที่มีอยู่ อาจจะส่งผลให้มันไม่พอจนเกิด job ดีเลย์
จากปัญหาที่เล่ามาจะเห็นว่ามันค่อนข้างกระทบกับ pipeline โดยตรง และจากที่ทีมได้เจอมา บาง pipeline ใช้เวลาถึง 2 ชั่วโมง แต่พอหลังเจอและแก้ไป ใช้เวลาราว ๆ 20 นาที ดังนั้นขอนำไปสู่วิธีการที่เราใช้ตรวจเจอปัญหาที่เล่ามาเลย
The Solution, Spark Listener
เราจะใช้ Spark Listener มาแก้ปัญหาที่ว่า ก่อนอื่นเลย Spark Listener คืออะไร
Spark Listener คือเป็น Developer API ของ Spark ตัวนึงที่โดยปกติแล้ว Spark จะส่ง callback event มาที่ Interface นี้ แล้วเราสามารถหยิบ event นี้ไปทำอะไรต่อก็ได้ ซึ่งตัว Spark Web UI เองก็ใช้ Spark Listener ในการช่วย export ข้อมูลต่าง ๆ ออกมาแสดงผลที่หน้า Web UI นั่นเอง
ดังนั้นเราจะใช้เจ้านี่แหละ ช่วย export ข้อมูลต่าง ๆ ไม่ว่าจะเป็น job นี้ใช้ Spark Configuration อะไรบ้าง job เริ่มกี่โมง มี memory / disk spill หรือ data skew ไหม หรือ job นี้เขียน data ไปทั้งหมดกี่ไบต์
ซึ่งตัว Spark Listener ที่เราทำขึ้นมาเขียนด้วยภาษา Java และจะโฟกัสที่การคำนวณ Spark Task กับ Spark Stage เพื่อให้ได้ข้อมูลที่ต้องการ กับขออธิบายคร่าว ๆ ก่อนว่า Job, Stage, และ Task ใน Spark คืออะไร
- Spark Job — ทุกครั้งที่เราเรียกคำสั่ง save, collect, count หรืออะไรที่ทำการเรียก result จาก Spark ตัว Spark Driver จะสร้าง Job ขึ้นมา ซึ่งแต่ละ Job สามารถถูกประมวลแบบ parallel ได้พร้อมกัน
- Spark Stage — ในหนึ่ง Spark Job สามารถประกอบไปด้วยหลาย ๆ Stage และในแต่ละ Stage จะเก็บ logic ของการ transformation ไว้ เช่น filter, join, sort, หรือทำการ exchange Spark partition กันใน Spark cluster
- Spark Task — เป็น unit ที่เล็กที่สุดใน Spark ที่จะอยู่ใน Spark Stage อีกที ซึ่ง unit นี้จะเป็นส่วนที่ Spark worker จะนำไปประมวลผลจริง ๆ แล้ว
การทำงานของ Spark Listener ตัวนี้จะเริ่มจากการไปเก็บข้อมูลตาม Spark event ต่าง ๆ นำไป pre-process และย่อยให้เหลือแต่สถิติของแต่ละ Spark Stage เช่น เราเก็บจำนวน memory / disk spill, Percentile (P50/P75/P90) ของเวลาที่ใช้ไปในแต่ละ Spark Task, จำนวน byte / record ที่เขียนลงไปที่ HDFS, Error ต่าง ๆ (ถ้าเกิด failed) รวมถึงคำนวณ skewness ของ Spark Task ในแต่ละ Stage ด้วย และพอ pipeline ทำงานเสร็จ เราก็จะให้ Spark Listener ของเราส่งข้อมูลทั้งหมดออกไปหา server ตัวนึงที่ชื่อว่า DTP Internal Server ผ่าน REST API
DTP Internal Server เป็นอีกหนึ่ง Service ที่ภายในทีมจะใช้เพื่อเก็บ log ต่าง ๆ ผ่าน REST API และพอมันได้รับ request มาจาก client มันจะทำการส่งข้อมูลขึ้น Apache Kafka อีกทีนึง และข้อมูลจะถูก sync ลง Hive table ทุก ๆ 2 ชั่วโมง
ขอย้อนกลับไปที่วิธีที่เราใช้คำนวณ skewness เราใช้ทั้งหมด 3 วิธีในการวัด skewness ของ Spark stage ได้แก่
- Standard Deviation — ถ้าเวลาที่ใช้ไปของ task นั่นเกิน 2*SD หรือ 3*SD ก็จะบอกว่ามัน skew
- Skewness Formula — ใช้สูตรทางสถิติเข้ามาช่วยเลย อันนี้สุดท้ายเราจะได้ค่ามาหนึ่งค่าเพื่อบอกว่า distribution ของเวลาที่ใช้ไปในแต่ละ Spark task เป็นอย่างไร
- Max(Spark Task Runtime) — P90(Spark Task Runtime) > 10 minutes — พูดง่าย ๆ คือ ถ้าเกิด runtime ของ task สุดท้ายมันห่างกับ runtime ที่ P90 เกิน 10 นาที เราก็จะบอกว่ามัน skew นั่นเอง
ที่เราเลือกใช้วิธีวัดทั้งหมด 3 วิธีเพราะเรายังไม่เห็น data จริง ๆ ว่าวิธีการไหนเหมาะสมสำหรับ data ที่เราเก็บมา เราเลยต้อง implement ทั้ง 3 วิธีไปก่อน และอย่างวิธีที่ 3 มันก็มีจุดบอดเช่นกัน ถ้าเกิด task ที่ P90 ใช้เวลาใกล้เคียงกับ task ที่ใช้เวลาเยอะสุด มันก็จะบอกว่าไม่ skew (ขอเคลมก่อนว่าผู้เขียนไม่ได้เชี่ยวชาญด้านสถิติมาก ถ้าเกิดผิดพลาดตรงนี้ ขออภัยไว้ ณ ที่นี่ด้วย)
พอเรามีข้อมูลมาเก็บที่ Hive table แล้ว เราก็พร้อมที่จะ query ข้อมูลขึ้นมาแล้วเอาไปหา insight ต่าง ๆ รวมถึงสามารถนำไปคำนวณ SLO ได้ด้วย กับเราไม่ต้องห่วงเรื่อง retention ด้วย สามารถเก็บได้เป็นหลักปี (ใช้แค่ 1–3 เดือนก็เพียงพอต่อการใช้งานแล้ว)
เมื่อเรามี Spark Listener แล้ว เราจะเอาตัว Spark Listener ไปติดกับทุก ๆ pipeline ที่อยู่ตาม Airflow รวมถึงเราจะใส่ Spark Configuration เหล่านี้เพิ่มไปด้วย เพื่อเก็บไว้ใช้ทำ tracking กลับไปที่ตัว pipeline
- spark.dap.source — ใช้บอกที่มาของ Spark job นี้ว่าโดนส่งมาจาก data platform หรือ client ไหน
- spark.dap.platformEnv — ใช้บอก environment ของ data platform ที่ส่ง job เข้ามา
- spark.dap.appId — ใช้ระบุที่มาของ pipeline logic ซึ่งเรานำข้อมูลจาก Airflow macro มาใช้ และใช้เป็น format นี้ dag_id — task_id — run_id
ตัวอย่างใน spark-submit command ในการใช้ Spark Listener
spark-submit \
--conf \"spark.extraListeners=dap.LMWNSparkListener\" \
--conf \"spark.jars=dap-spark-listener-1.0.0.jar\" \
--conf \"spark.dap.source=dap-airflow\" \
--conf \"spark.dap.platformEnv=production\" \
--conf \"spark.dap.appId=wongnai_import--transform_and_update_wn_restaurant--scheduled__2023-04-13T02:00:00+00:00\" \
script.py
Let’s Get into Monitoring
มาถึงส่วนสุดท้ายแล้ว แล้วเรา monitor อะไรบ้าง อย่างแรกคือเราตั้ง SLO ขึ้นมาก่อนเลยโดย SLO ของเรามีทั้งหมด 4 ข้อ
- Runtime — มี pipeline จำนวนเท่าไหร่ที่มี runtime รันเกิดกว่า time limit ที่ตั้งไว้ ซึ่ง time limit เราคิดมาจาก cron ของ Airflow เช่น runtime ของ @hourly pipeline จะต้องไม่เกิน 70% ของ 1 ชั่วโมง หรือคิดเป็น 42 นาที หรือ runtime ของ 30 */2 * * * pipeline จะต้องไม่เกิน 1 ชั่วโมงเป็นต้น
- Skew — สำหรับ skew เราวัดมาจาก คำตอบที่ได้จาก skewness calculation จากทั้งหมด 3 วิธีที่ได้กล่าวไว้ และเอาคำตอบมารวมกัน ถ้ามีคำตอบใดคำตอบหนึ่งบอกว่ามัน skew เราก็จะนับว่า pipeline นี้ไม่ meet SLO (ทำคล้าย ๆ กับ ensemble method)
- Spill — อันนี้ค่อนข้างตรงไปตรงมา ถ้า pipeline ไหนมี spill ไม่ว่าจะเกิดที่ memory หรือ disk เราก็จะนับว่า pipeline นี้ไม่ meet SLO ทันที
- Failed Apps — SLO ตัวสุดท้ายคือ pipeline เรา failed เมื่อไหร่ ก็จะนับว่าไม่ meet SLO ทันที
และนี่คือตัวอย่าง SLO จะเห็นว่ามันแบ่งเป็นตาม tier อีก ใช่แล้ว ในทุก ๆ pipeline จะมีการระบุความสำคัญเป็น tier อยู่ ไล่ความสำคัญตั้งแต่ tier-1, tier-2, tier-3, tier-4, tier-no (ไม่มี tier) เราเลยเอา tier ตรงนี้มาช่วย grouping ในแต่ละ SLO ด้วย ดังนั้นเราจะสามารถให้ความสำคัญกับการทำ pipeline tuning ได้ดีขึ้น
จากรูป SLO จะเห็นไว้ว่า Runtime เราค่อนข้างไม่เขียว และมี Spill ค่อนข้างเยอะมาก กลับกับในส่วนของ Skew จะเห็นว่ามันค่อนข้างจะเขียวเลย และก็เคสของ Failed Apps มันควรเป็น 100% ซึ่งหมายถึงไม่มี pipeline ไหนพังเลย แต่กลับมาที่โลกของความจริง มันก็ไม่ได้เป็นอย่างนั้น ดังนั้นคำถามคือ เราจะทำให้ตัวเลขเหล่านี้มันเพิ่มขึ้นยังไงได้บ้าง จึงเข้ามาสู่การ monitor อีกส่วนนึงคือเราสามารถดูเป็นราย pipeline ได้เลยว่า pipeline ไหนมีปัญหาบ้าง
ตัวอย่างแรกเลยคือเราทำ pivot table ที่ลิสต์ DAG ที่ใช้ runtime เกินกว่า time limit และเอามานับเป็นจำนวนครั้งต่อวัน จะเห็นว่ามีสีแดง ๆ อยู่อันนึงที่ runtime เกินตลอดเวลา (เป็น pipeline ที่ทำงานทุก ๆ 30 นาที) แต่เป็น pipeline tier-3 ซึ่งอาจจะไม่สำคัญมาก ยังปล่อยให้ delay ได้อยู่ (runtime จริงอาจจะใกล้กับรอบ DAG run มาก ๆ เช่น runtime 28–29 นาที) ถ้าเคสนี้เราก็ควรโฟกัสที่ pipeline tier-1 ก่อน เพราะ impact ต่อ business มากกว่า
ตัวอย่างที่สองยังเป็น pivot table เหมือนเดิม แต่เป็น pivot table ที่โชว์ลิสต์ของ DAG กับ Task Instance ที่เกิดปัญหา spill ทั้ง memory และ disk
ตัวอย่างที่สามเป็น pivot table ที่สรุป error จาก Spark ว่าในแต่ละ pipeline ที่พังมันพังด้วยเหตุผลอะไร และก็สามารถดูเพิ่มเติมได้ว่า error เต็ม ๆ คืออะไร และ มี YARN Application Id ให้สามารถกลับไป search หาใน Spark History Server ได้เพื่อการ debugging เพิ่มเติม
สำหรับ Skew pipeline เราสามารถดูได้เลยว่า Spark Stage ไหนที่มันเกิด Skew และสามารถย้อนกลับที่ไปดูที่ Spark Web UI ที่อยู่ใน Spark History Server ได้อีกด้วย
นอกเหนือจากนี้เรายังมีกราฟที่จะช่วยเราหาอีกว่าตอนนี้มี pipeline ไหนบ้างที่ใช้ resource เยอะ ๆ แต่เขียน data ออกไปนิดเดียว ดังในรูปข้างล่างนี้ ดังในรูปจะเห็นว่ามี pipeline อีกหลาย ๆ ตัวเลยที่ใช้จำนวน Spark executor ถึง 100 ตัว แต่เขียน data ออกไปไม่ถึง 5 กิกะไบต์ด้วยซ้ำ ใช่ครับทุกคนฟังไม่ผิด มีจำนวนเยอะประมาณนึงเลย TT ซึ่งจริง ๆ แล้ว pipeline เหล่านี้อาจจะใช้แค่ Spark executor แค่ 5 ตัวก็เพียงพอแล้วก็ได้
นอกเหนือจากนี้เรายังสามารถหา pipeline ที่เขียนไฟล์ ลง HDFS แบบไม่ optimal ได้ด้วย อย่างเช่น มี pipeline เขียนไฟล์ทั้งหมด 1 ไฟล์มีขนาด 2 กิกะไบต์ แบบนี้เรียกว่าไม่ optimal ซึ่งควรปรับให้เขียนทั้งหมด 8 ไฟล์แทน (1 ไฟล์ควรมีขนาดใกล้เคียงกับ HDFS Block Size ~256MB) ซึ่งตรงนี้ก็เป็นอีกหนึ่งจุดที่มักจะทำให้เกิดปัญหา spill นั่นเอง เพราะเราเขียนข้อมูลใหญ่เกินไปกว่าที่ execution memory ของ Spark worker จะรับไหว อีกทั้งยังทำให้ runtime สูงขึ้นด้วย เพราะ Spark รวบรวม data จากหลาย ๆ ที่เข้ามารวมที่เดียวกันก่อนเขียนไฟล์
Example Results after We have Monitoring
เราลองมาดูตัวอย่างเคสจริงที่เราเจอจากการ monitoring และได้ทำการปรับปรุงแก้ไขไปแล้วกัน
เคสที่ 1 — เป็นเคสที่ pipeline ที่รันวันละครั้ง โดย pipeline จะรัน query ผ่าน SparkSQL เพื่อทำ precomputed table ซึ่ง pipeline นี้ย้ำว่าเป็น pipeline ที่รันผ่านทุกวันและไม่มีการ Retry แต่อย่างใด แต่มีปัญหา Data Skew เลยทำให้ runtime สูงเกินสองชั่วโมง ซึ่งเรามาตรวจเจอสิ่งนี้จาก pivot table ด้านบน เลยทำการปรับแก้ query โดยเปลี่ยน Join Strategy (เปลี่ยนโค้ดแค่ 1 บรรทัด) ไปใช้ Broadcast Hash Join แทน สุดท้าย runtime เลยเหลือไม่ถึง 25 นาที
เคสที่ 2 — เคสนี้เป็น pipeline ที่จะทำงานทุก ๆ ชั่วโมง โดยที่ปัญหามันคือ มันเกิดการ retry ที่ Airflow หลาย ๆ รอบ (ไม่ได้เจอทุกรอบ) สาเหตุเป็นเพราะ Spark worker มี resource ไม่พอ และ pipeline นี้เขียนไฟล์ลง HDFS แบบไม่ optimal มันเลยทำให้ Spark worker ตายจน Airflow ต้อง retry ตัวเองใหม่เรื่อย ๆ ถึง 2–3 รอบ จนมันผ่านเอง ซึ่งที่ผ่านยังไม่มี error จาก pipeline นี้ส่งลงไปที่ Slack ของทีมเลยเพราะ Airflow ยัง retry ตัวเองไม่ถึง threshold ที่ตั้งไว้ หลังจากการปรับ tuning ไป โดยได้ปรับ resource และได้ทำการ repartition ใน Spark ไปเพื่อปรับจำนวนไฟล์ก่อนจะเขียนลง HDFS ผลที่ได้ก็คือไม่มี Airflow retry เกิดขึ้นอีกเลย
Final Thought
มาถึงสรุปแล้ว ก็หลังจากที่ทีมเราได้มี Spark Listener เข้ามาช่วย export ข้อมูลและมี Dashboard / Graph ในการช่วยชี้เป้าปัญหาต่าง ๆ เราก็เริ่มทำการแก้ไขปัญหาได้อย่างตรงจุดมากขึ้น รวดเร็ว และขุดปัญหาใต้พรมต่าง ๆ ที่เราไม่เคยรู้ และก็ยังสามารถ support ทุก ๆ ทีม ทุก ๆ platform ที่ใช้ Apache Spark ได้ในเวลาเดียวกันอีกทั้งเราก็มี SLO ด้วยที่ได้มาจากข้อมูลตรงนี้ ทำให้เราสามารถมั่นใจได้ว่า pipeline ทั้งหมดของเรายังทำงานได้ดี หรือถ้ายังไม่ meet SLO ก็ต้องทำ improvement กันไป ฮ่า ๆ ซึ่งทั้งหมดมันก็ส่งผลให้ pipeline เรามีคุณภาพ มี observability ที่ดีขึ้น และถ้าฝั่ง pipeline ดีขึ้น มันก็ส่งผลไปถึงปลายทางฝั่ง business ด้วยเช่นกัน business ก็จะมั่นใจการใช้ข้อมูลประกอบการตัดสินใจมากขึ้น
สุดท้ายนี้ขอแถมอีกนิดนึง ขายของกันรัว ๆ ในปัจจุบัน pipeline ของทีม Data ที่ทำงานอยู่บน Apache Spark มีจำนวนรวมราว ๆ 8,000 pipeline runs ต่อวัน นับเป็นรายชั่วโมงคือราว ๆ 600 pipeline runs ต่อชั่วโมง และมีข้อมูลถูกเขียนลง HDFS เฉลี่ยแล้ว 16 Terabytes ต่อวัน หรือตีเป็น record ได้ 50 พันล้านแถว ก็ด้วย scale งานปัจจุบันเราจึงยังต้องการคนเก่ง ๆ เข้ามาร่วมทีมอีก ใครสนใจเข้ามาร่วมงานด้วยกันไม่ว่าจะเป็น Data Engineer ที่จะเข้ามาพัฒนา pipeline เหล่านี้ให้ดีขึ้น หรือ Data Platform Engineer ที่จะเข้ามาทำให้ data platform ของ LINE MAN Wongnai เสถียรขึ้น scalable มากขึ้น (เราไม่ได้มีแต่ ETL Infrastructure อย่างเดียวที่ต้องดูแล ยังมีอีกหลาย services ที่ยังไม่ได้พูดถึงในนี้ต้องดูแลอีกมากมาย) หรืออยากเข้ามาซัพพอร์ตฝั่ง Business Intelligence มาเขียน query เพื่อหา insight ของธุรกิจก็สามารถส่งใบสมัครงานเข้ามาได้ที่ https://careers.lmwn.com/data-and-analytics
ขอบคุณครับ