Structured Streaming报错记录:Overloaded method foreachBatch with alt

举报
WHYBIGDATA 发表于 2023/01/12 15:05:20 2023/01/12
【摘要】 Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

Structured Streaming报错记录:Overloaded method foreachBatch with alternatives



0. 写在前面

  • Spark : Spark3.0.0
  • Scala : Scala2.12

1. 报错

overloaded method value foreachBatch with alternatives:

2. 代码及报错信息

Error:(48, 12) overloaded method value foreachBatch with alternatives:

(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>

(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]

cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])

.foreachBatch((df, batchId) => {

import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

object ForeachBatchSink1 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ForeachSink1")
            .getOrCreate()
        import spark.implicits._
        
        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "cluster01")
            .option("port", 10000)
            .load
        
        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "1234")
        
        val query: StreamingQuery = lines.writeStream
            .outputMode("update")
            .foreachBatch((df, batchId) => {
                val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
               
                result.persist()
              result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                result.write.mode("overwrite").json("./foreach1")
                result.unpersist()
            })
//            .trigger(Trigger.ProcessingTime(0))
            .trigger(Trigger.Continuous(10))
            .start
        query.awaitTermination()
      
    }
}

/**

  • Error:(43, 12) overloaded method value foreachBatch with alternatives:
  • (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>
  • (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
  • cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
  • .foreachBatch((df, batchId) => {
    */
import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

object ForeachBatchSink {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ForeachSink")
            .getOrCreate()
        import spark.implicits._
        
        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "cluster01")
            .option("port", 10000)
            .load
        
        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "1234")
        
        val query: StreamingQuery = lines.writeStream
            .outputMode("complete")
            .foreachBatch((df, batchId) => {          
                result.persist()
                result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                result.write.mode("overwrite").json("./foreach")
                result.unpersist()
            })
            .start
        query.awaitTermination()
      
    }
}

3. 原因及纠错

Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样

正确代码如下

import java.util.Properties
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}


object ForeachBatchSink {

    def myFun(df: Dataset[Row], batchId: Long, props: Properties): Unit = {
        println("BatchId" + batchId)
        if (df.count() != 0) {
            df.persist()
            df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
            df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
            df.unpersist()
        }
    }

    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession
          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink")
          .getOrCreate()
        import spark.implicits._

        val lines: DataFrame = spark.readStream
          .format("socket") // TODO 设置数据源
          .option("host", "cluster01")
          .option("port", 10000)
          .load

        val wordCount: DataFrame = lines.as[String]
          .flatMap(_.split("\\W+"))
          .groupBy("value")
          .count()  // value count

        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "1234")

        val query: StreamingQuery = wordCount.writeStream
          .outputMode("complete")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
              myFun(df, batchId, props)
          })
          .start

        query.awaitTermination()

    }
}

import java.util.Properties

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object ForeachBatchSink1 {

    def myFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit = {
        import spark.implicits._
        println("BatchId = " + batchId)
        if (df.count() != 0) {
            val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
            result.persist()
            result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
            result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
            result.unpersist()
        }
    }

    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession
          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink1")
          .getOrCreate()
        import spark.implicits._

        val lines: DataFrame = spark.readStream
          .format("socket") // TODO 设置数据源
          .option("host", "cluster01")
          .option("port", 10000)
          .load

        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "1234")

        val query: StreamingQuery = lines.writeStream
          .outputMode("update")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
                myFun(df, batchId, props, spark)
          })
          .trigger(Trigger.Continuous(10))
          .start
        query.awaitTermination()

    }
}

4. 参考链接

https://blog.csdn.net/Shockang/article/details/120961968

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。