object StructuredStreamingSinkIceberg{
def main(args: Array[String]): Unit = {
//1.准备对象
val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")
.getOrCreate()
// spark.sparkContext.setLogLevel("Error")
//2.创建Iceberg 表
spark.sql(
"""
|create table if not exists hadoop_prod.iceberg_db.iceberg_table (
| current_day string,
| user_id string,
| page_id string,
| channel string,
| action string
|) using iceberg
""".stripMargin)
val checkpointPath = "hdfs://mycluster/iceberg_table_checkpoint"
val bootstrapServers = "node1:9092,node2:9092,node3:9092"
//多个topic 逗号分开
val topic = "kafka-iceberg-topic"
//3.读取Kafka读取数据
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("auto.offset.reset", "latest")
.option("group.id", "iceberg-kafka")
.option("subscribe", topic)
.load()
import spark.implicits._
import org.apache.spark.sql.functions._
val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)].toDF("id", "data")
val transDF: DataFrame = resDF.withColumn("current_day", split(col("data"), "\t")(0))
.withColumn("ts", split(col("data"), "\t")(1))
.withColumn("user_id", split(col("data"), "\t")(2))
.withColumn("page_id", split(col("data"), "\t")(3))
.withColumn("channel", split(col("data"), "\t")(4))
.withColumn("action", split(col("data"), "\t")(5))
.select("current_day", "user_id", "page_id", "channel", "action")
//结果打印到控制台,Default trigger (runs micro-batch as soon as it can)
// val query: StreamingQuery = transDF.writeStream
// .outputMode("append")
// .format("console")
// .start()
//4.流式写入Iceberg表
val query = transDF.writeStream
.format("iceberg")
.outputMode("append")
//每分钟触发一次Trigger.ProcessingTime(1, TimeUnit.MINUTES)
//每10s 触发一次 Trigger.ProcessingTime(1, TimeUnit.MINUTES)
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.option("path", "hadoop_prod.iceberg_db.iceberg_table")
.option("fanout-enabled", "true")
.option("checkpointLocation", checkpointPath)
.start()
query.awaitTermination()
}
}