public class HDFSWordCount {
private static String BASE="hdfs://hadoop0:9000/data/xx/yy/zz/";
public static void main(String[] args) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("HDFSWordCount");
// sc.textFile("hdfs://n1:8020/user/hdfs/input");
// sc.textFile("hdfs://hadoop0:9000/spark/");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
// 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流
JavaDStream<String> callLines = jssc.textFileStream(BASE+"oidd_call/");
// JavaDStream<String> smsLines = jssc.textFileStream(BASE+"oidd_sms/*/*/*/");
// JavaDStream<String> locationLines = jssc.textFileStream(BASE+"oidd_location/*/*/*/");
callLines.print();
// smsLines.print();
// locationLines.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
}