Spark优化

发布时间 2023-11-11 20:38:54作者: 唐钰逍遥

意识篇

  • 类型转换
    优化前:

    val extractFields: Seq[Row] => Seq[(String, Int)] = {
    (rows: Seq[Row]) => {
        var fields = Seq[(String, Int)]()
        rows.map(row => {
            fields = fields :+ (row.getString(2), row.getInt(4))
        })
    	fields
    }
    }
    

    优化后:

    val extractFields: Seq[Row] => Seq[(String, Int)] = {
    (rows: Seq[Row]) =>
    	rows.map(row => (row.getString(2), row.getInt(4))).toSeq
    }
    

    过程式编程变为函数式编程

    关注主要业务逻辑,最终一次性完成类型转换。

  • 数据行的操作不要在Driver端执行
    优化前:

    def createInstance(factDF: DataFrame, startDate: String, endDate: String): Dat
    val instanceDF = factDF
        .filter(col("eventDate") > lit(startDate) && col("eventDate") <= lit(endDate))
        .groupBy("dim1", "dim2", "dim3", "event_date")
        .agg(sum("value") as "sum_value")
        instanceDF
    }
    pairDF.collect.foreach{
    case (startDate: String, endDate: String) =>
        val instance = createInstance(factDF, startDate, endDate)
        val outPath = s"${rootPath}/endDate=${endDate}/startDate=${startDate}"
        instance.write.parquet(outPath)
    }
    

    这种写法会把数据拉回到Driver端一句一句操作,会很慢。
    优化后:

    val instances = factDF
    .join(pairDF, factDF("eventDate") > pairDF("startDate") && factDF("eventDate")
    .groupBy("dim1", "dim2", "dim3", "eventDate", "startDate", "endDate")
    .agg(sum("value") as "sum_value")
    instances.write.partitionBy("endDate", "startDate").parquet(rootPath)
    

    这种只会分区落盘时拉取数据一次。
    Tips

    Spark 延迟计算的 Actions 算子主要有两类:

    • 一类是将分布式计算结果直接落盘的操作,
      如 DataFrame 的 write、RDD 的 saveAsTextFile 等;

    • 另一类是将分布式结果收集到 Driver 端的操作,

      如 first、take、collect。