Spark Streaming入门 - 从kafka读取数据,基于Direct模式 [主流,生产环境就使用这种模式]

news/2025/5/31 10:12:57

1 添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version>
</dependency>

2 核心代码

package cn.taobao.ka.consumer;
import cn.taobao.flume.pollxiaoxi;
import kafka.serializer.StringDecoder;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import java.util.*;public class spark_direct_xiaofei {public static void main(String[] args) throws Exception {//StreamingContext 编程入口JavaStreamingContext jssc = new JavaStreamingContext("local[*]","kafka_direct_1",Durations.seconds(1),System.getenv("SPARK_HOME"),JavaStreamingContext.jarOfClass(pollxiaoxi.class.getClass()));jssc.sparkContext().setLogLevel("ERROR");//kafka主题和kafka连接地址Set<String> topicsSet = new HashSet<>(Arrays.asList("zhuti_1"));Map<String, String> kafkaParams = new HashMap<>();kafkaParams.put("metadata.broker.list", "kncloud02:6667,kncloud03:6667,kncloud04:6667");//开始接收kafka数据JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,kafkaParams,topicsSet);//数据处理messages.print();//显式的启动数据接收jssc.start();//来等待计算完成jssc.awaitTermination();}
}

 

文章来源:https://blog.csdn.net/qq_41712271/article/details/116605417
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:https://dhexx.cn/news/show-17347.html

相关文章

android 工具类之SharePreference

1 /** 2 * SharedPreferences的一个工具类&#xff0c;调用setParam就能保存String, Integer, Boolean, Float, Long类型的参数 3 * 同样调用getParam就能获取到保存在手机里面的数据 4 * author xiaanming 5 * 6 */ 7 public class SharedPreferencesUtils { 8 /**…

SwipeRefreshLayout控件

SwipeRefreshLayout组件只接受一个子组件&#xff1a;即需要刷新的那个组件。它使用一个侦听机制来通知拥有该组件的监听器有刷新事件发生&#xff0c;换句话说我们的Activity必须实现通知的接口。该Activity负责处理事件刷新和刷新相应的视图。一旦监听者接收到该事件&#xf…

pyspark报错解决 “py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.isEncryptionEnabled“

报错 py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.isEncryptionEnabled does not exist in the JVM解决办法&#xff0c;添加如下前两行代码&#xff0c;放在操作spark之前 #添加此代码 import findspark findspark.init() #添加此代码from pyspark im…

【python】多线程编程

1. 线程基础 1.1. 线程状态 线程有5种状态&#xff0c;状态转换的过程如下图所示&#xff1a; 1.2. 线程同步&#xff08;锁&#xff09; 多 线程的优势在于可以同时运行多个任务&#xff08;至少感觉起来是这样&#xff09;。但是当线程需要共享数据时&#xff0c;可能存在数据…

Python for循环前有函数

参考博文https://blog.csdn.net/weixin_44551646/article/details/109549874示例1 def add(a):return a1def main():list [1,2,3,4,5]xs [add(a) for a in list]print(xs)if __name__ __main__:main()Ouput&#xff1a;[2, 3, 4, 5, 6] 先执行for a in list&#xff0c;将…

Linux开机自动挂载存储的两种方式

登录服务器&#xff0c;给查看了下&#xff0c;发现确实是没有自动加载&#xff0c;df -h只能显示本地硬盘的分区&#xff0c;fdisk -l 还是能看到存储空间&#xff0c;这说明这个服务器连接存储是木有问题的。 输入history | grep mount&#xff0c;查看所有mount记录&#xf…

Burpsuite详细教程

Burpsuite是一种功能强大的Web应用程序安全测试工具。它提供了许多有用的功能和工具&#xff0c;可以帮助用户分析和评估Web应用程序的安全性。在本教程中&#xff0c;我们将介绍如何安装、配置和使用Burpsuite&#xff0c;并提供一些常用的命令。 第一步&#xff1a;安装Burp…

Java中List和ArrayList的区别

List是一个接口&#xff0c;而ListArray是一个类。 ListArray继承并实现了List。 所以List不能被构造&#xff0c;但可以向上面那样为List创建一个引用&#xff0c;而ListArray就可以被构造。 List list; //正确 listnull; List listnew List(); // 是错误的用法 Li…

pyspark 本地远程连接hive

#配置本地spark的环境&#xff0c;必须放在最前面 import findspark findspark.init() print(findspark.find())from pyspark.sql import SparkSession#新建sparksession sparksession SparkSession.builder.master("local[*]").appName("hive_test_1") \…

要后台控制前台的的CSS样式,我们可以加入ASP.NET Literal 控件

ASP.NET Literal 控件&#xff0c;用于在页面上显示文本。此文本是可编程的。 我用它来制作了 if(usertype1){this.LtdMemberPromotion7.Text "<style type\"text/css\">#tdMemberPromotion7 {display:none;}</style>";} 这样一段代码&#x…