99网
您的当前位置:首页Spark Streaming保存到HDFS目录中案例

Spark Streaming保存到HDFS目录中案例

来源:99网

Spark Streaming代码:

package streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HDFSWordCount {
  def main(args: Array[String]): Unit = {
//    if (args.length < 1 ){
//      System.err.println("Usage: HdfsWordCount <directory>")
//      System.exit(1)
//    }
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")//.setMaster("local[2]")
    // create the context
    val scc = new StreamingContext(sparkConf,Seconds(2))

    val lines = scc.socketTextStream("master",9999)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map((_,1)).reduceByKey(_+_)
    wordCounts.print()
    wordCounts.saveAsObjectFiles(args(0))
    scc.start()
    scc.awaitTermination()
  }
}

利用maven打包:

mvn clean assembly:assembly

上传到集群后
创建脚本 run_hdfs20.sh :

cd $SPARK_HOME
./bin/spark-submit \
        --class streaming.HDFSWordCount \
        --master yarn-cluster \
        --files $HIVE_HOME/conf/hive-site.xml \
        /usr/local/src/badou_code/streaming/badou_spark_20_test-1.0-SNAPSHOT-jar-with-dependencies.jar \
        hdfs://master:9000/output/log

运行脚本 sh -x run_hdfs20.sh

启动端口命令:nc -lp 9999 随便输出数字字母
结果:

-------------------------------------------
Time: 1612670866000 ms
-------------------------------------------
(,1)
(a,4)

-------------------------------------------
Time: 1612670868000 ms
-------------------------------------------
(aa,1)
(a,4)

hdfs中查询:hadoop fs -ls /output/

drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670296000
drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670298000
drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670300000
drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670302000
drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670304000

因篇幅问题不能全部显示,请点此查看更多更全内容