1 添加依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version>
</dependency>
2 核心代码
package cn.taobao.ka.consumer;
import cn.taobao.flume.pollxiaoxi;
import kafka.serializer.StringDecoder;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import java.util.*;public class spark_direct_xiaofei {public static void main(String[] args) throws Exception {//StreamingContext 编程入口JavaStreamingContext jssc = new JavaStreamingContext("local[*]","kafka_direct_1",Durations.seconds(1),System.getenv("SPARK_HOME"),JavaStreamingContext.jarOfClass(pollxiaoxi.class.getClass()));jssc.sparkContext().setLogLevel("ERROR");//kafka主题和kafka连接地址Set<String> topicsSet = new HashSet<>(Arrays.asList("zhuti_1"));Map<String, String> kafkaParams = new HashMap<>();kafkaParams.put("metadata.broker.list", "kncloud02:6667,kncloud03:6667,kncloud04:6667");//开始接收kafka数据JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,kafkaParams,topicsSet);//数据处理messages.print();//显式的启动数据接收jssc.start();//来等待计算完成jssc.awaitTermination();}
}