flink自动加全局流水

news/2024/5/21 6:44:49

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction

import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/elink_20230603')
# 对每行数据添加字符串 'aaaa'
class MyMapFunction(MapFunction):
   def open(self, runtime_context: RuntimeContext):
     self.r = redis.Redis(host='127.0.0.1', port=6379)

   def map(self,line):
    process_id='';
    bus_seq=''
    if not line.startswith("ES"):
        return
    if '<Serial>' in line:
       pat=re.compile(r"<Serial>(\d+)</Serial>")
       bus_seq=pat.findall(line)
       process_id=line.split()[1]
       self.r.set(process_id,bus_seq[0])
    process_id=line.split()[1]
    if not len(process_id)==6 :
        process_id=line.split()[2]
     
    bus_seq=self.r.get(process_id) 
    if not bus_seq:
        return
    #self.r.delete(process_id)
    return(bus_seq.decode('UTF-8')+'->'+line)
new_stream = data_stream.map(MyMapFunction()).set_parallelism(1)

# 输出到控制台
new_stream.print()

# 执行任务
env.execute('Add "aaaa" to each line')


https://dhexx.cn/news/show-4628158.html

相关文章

CXGRid实现拖动鼠标多选

要实现在CXGrid中拖动鼠标多选&#xff0c;您可以按住鼠标左键并拖动鼠标&#xff0c;直到选择了要选择的单元格或行。您可以在拖动过程中按住Shift键来限制选择范围。拖动选择的单元格或行时&#xff0c;您可以按住Ctrl键来添加或删除单元格或行的选择。当您完成选择时&#x…

GTK官方教程

前言&#xff1a; 让你在开发中爱不释手的 GT 包。关注GSLS官网&#xff0c;查看更多源码 ヾ(✿&#xff9f;▽&#xff9f;)ノ工具包。 所有文章 小编尽量让读者可以 直接 读懂 与 完全 复制粘贴&#xff0c;其中复杂或较多 的源码 会有 源码 并 贴上 github 网址。 GT 类 里面…

【JUC基础】14. ThreadLocal

目录 1、前言 2、什么是ThreadLocal 3、ThreadLocal作用 4、ThradLocal基本使用 4.1、创建和初始化 4.2、存储和获取线程变量 4.3、清理和释放线程变量 4.4、小结 4.5、示例代码 5、ThreadLocal原理 5.1、set() 5.2、get() 5.3、变量清理 5.4、ThreadLocalMap 6、…

springboot+vue+elementui计算机专业课程选课管理系统vue

本系统的主要任务就是负责对学生选课。主要用户为老师、学生,其中,学生可对自己的信息进行查询,可以进行选课,也可以进行删除已选课程,教师可对学生和课程的信息进行查询&#xff0c;教师拥有所有的权限,可以添加删除学生信息。系统提供界面,操作简单。 为实现这些功能,系统一个…

弄清楚Node.js的功能特性、运行时、组成和发展趋势分析以及与JavaScript的区别

目录 Node.js介绍 Node.js的作用和功能特性 Node.js应用场景 运行时是什么 Node.js的组成 1. V8引擎 2. 本地&#xff08;核心&#xff09;模块 3. 标准库 Node.js的发展趋势 总结 Node.js是一个基于Chrome V8引擎的JavaScript运行时环境&#xff0c;使JavaScript可以…

图论试题2021

25 A&#xff1a;最大度是7&#xff0c;大于了顶点数6&#xff0c;故不是简单图的度序列。 C&#xff1a;树的度序列至少要有两个度为1的顶点 D&#xff1a;只要度数为奇数的个数有偶数个&#xff0c;就是度序列。 A&#xff1a;每棵树的中心由一个点或两个相邻点组成 B&…

Vue+springboot舞蹈基础课程视频学习分享平台的实现和开发

基于java语言设计并实现了舞蹈基础数据平台。该系统基于B/S即所谓浏览器/服务器模式&#xff0c;应用Springboot框架&#xff0c;选择MySQL作为后台数据库。系统主要包括首页、个人中心、用户管理、舞蹈类型管理、舞蹈视频管理、用户留言、管理员管理、系统管理等功能模块。 重…

平面图学习

空调管道的设计&#xff1a;某景区有6个景点&#xff0c;位置分布如下图。 分析者认为&#xff1a;(1) A1与A4&#xff0c; (2) A2与A5&#xff0c; (3) A3与A6间人流较少&#xff0c;其它景点之间人流量大&#xff0c;必须投资铺设空调管道&#xff0c;但要求空调管道间不能交…

2.项目中的文件

项目的路径是这样的 目录 1 pages 1.1 json 1.2 wxml 1.3 wxss 1.4 js 2 utils 3 .eslintrc.js 4 app.js 5 app.json 6 app.wxss 7 project.config.json 8 project.private.config.json 9 sitemap.json 1 pages pages 用来存放所有小程序的页面&am…

随机数发生器设计(三)

随机数发生器设计&#xff08;三&#xff09;- 熵估计和健康测试 熵估计健康测试 熵估计 考虑都熵源的多样性&#xff0c;建立一个通用的熵估计模型比较困难。本文采用nist.sp.800-90B推荐的Markov评估。详见 https://doi.org/10.6028/NIST.SP.800-90B。 执行Markov评估时&am…