flink自动加全局流水

news/2024/9/20 12:17:15

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction

import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/elink_20230603')
# 对每行数据添加字符串 'aaaa'
class MyMapFunction(MapFunction):
   def open(self, runtime_context: RuntimeContext):
     self.r = redis.Redis(host='127.0.0.1', port=6379)

   def map(self,line):
    process_id='';
    bus_seq=''
    if not line.startswith("ES"):
        return
    if '<Serial>' in line:
       pat=re.compile(r"<Serial>(\d+)</Serial>")
       bus_seq=pat.findall(line)
       process_id=line.split()[1]
       self.r.set(process_id,bus_seq[0])
    process_id=line.split()[1]
    if not len(process_id)==6 :
        process_id=line.split()[2]
     
    bus_seq=self.r.get(process_id) 
    if not bus_seq:
        return
    #self.r.delete(process_id)
    return(bus_seq.decode('UTF-8')+'->'+line)
new_stream = data_stream.map(MyMapFunction()).set_parallelism(1)

# 输出到控制台
new_stream.print()

# 执行任务
env.execute('Add "aaaa" to each line')


https://dhexx.cn/news/show-4628158.html

相关文章

CXGRid实现拖动鼠标多选

要实现在CXGrid中拖动鼠标多选&#xff0c;您可以按住鼠标左键并拖动鼠标&#xff0c;直到选择了要选择的单元格或行。您可以在拖动过程中按住Shift键来限制选择范围。拖动选择的单元格或行时&#xff0c;您可以按住Ctrl键来添加或删除单元格或行的选择。当您完成选择时&#x…

GTK官方教程

前言&#xff1a; 让你在开发中爱不释手的 GT 包。关注GSLS官网&#xff0c;查看更多源码 ヾ(✿&#xff9f;▽&#xff9f;)ノ工具包。 所有文章 小编尽量让读者可以 直接 读懂 与 完全 复制粘贴&#xff0c;其中复杂或较多 的源码 会有 源码 并 贴上 github 网址。 GT 类 里面…

【JUC基础】14. ThreadLocal

目录 1、前言 2、什么是ThreadLocal 3、ThreadLocal作用 4、ThradLocal基本使用 4.1、创建和初始化 4.2、存储和获取线程变量 4.3、清理和释放线程变量 4.4、小结 4.5、示例代码 5、ThreadLocal原理 5.1、set() 5.2、get() 5.3、变量清理 5.4、ThreadLocalMap 6、…

springboot+vue+elementui计算机专业课程选课管理系统vue

本系统的主要任务就是负责对学生选课。主要用户为老师、学生,其中,学生可对自己的信息进行查询,可以进行选课,也可以进行删除已选课程,教师可对学生和课程的信息进行查询&#xff0c;教师拥有所有的权限,可以添加删除学生信息。系统提供界面,操作简单。 为实现这些功能,系统一个…

弄清楚Node.js的功能特性、运行时、组成和发展趋势分析以及与JavaScript的区别

目录 Node.js介绍 Node.js的作用和功能特性 Node.js应用场景 运行时是什么 Node.js的组成 1. V8引擎 2. 本地&#xff08;核心&#xff09;模块 3. 标准库 Node.js的发展趋势 总结 Node.js是一个基于Chrome V8引擎的JavaScript运行时环境&#xff0c;使JavaScript可以…

截取文本中间部分字符串算法思想

功能实现效果排除首位部分字符串 确定部分字符串的尾部&#xff0c;获取传入尾部字符串索引起始位置坐标 确定部分字符串的首部&#xff0c;获取传入首部字符串索引位置坐标与首部字符串长度之和 利用字符串截取函数获取即可 public String getSubString(String text, Strin…

图论试题2021

25 A&#xff1a;最大度是7&#xff0c;大于了顶点数6&#xff0c;故不是简单图的度序列。 C&#xff1a;树的度序列至少要有两个度为1的顶点 D&#xff1a;只要度数为奇数的个数有偶数个&#xff0c;就是度序列。 A&#xff1a;每棵树的中心由一个点或两个相邻点组成 B&…

Spring-Cloud-Gateway 整合 Sa-Token 全局过滤器之路由匹配

Spring-Cloud-Gateway 整合 Sa-Token 全局过滤器之路由匹配 Sa-Token 是一个轻量级 Java 权限认证框架&#xff0c;主要解决&#xff1a;登录认证、权限认证、单点登录、OAuth2.0、分布式Session会话、微服务网关鉴权 等一系列权限相关问题。 Sa-Token 旨在以简单、优雅的方式完…

Vue+springboot舞蹈基础课程视频学习分享平台的实现和开发

基于java语言设计并实现了舞蹈基础数据平台。该系统基于B/S即所谓浏览器/服务器模式&#xff0c;应用Springboot框架&#xff0c;选择MySQL作为后台数据库。系统主要包括首页、个人中心、用户管理、舞蹈类型管理、舞蹈视频管理、用户留言、管理员管理、系统管理等功能模块。 重…

平面图学习

空调管道的设计&#xff1a;某景区有6个景点&#xff0c;位置分布如下图。 分析者认为&#xff1a;(1) A1与A4&#xff0c; (2) A2与A5&#xff0c; (3) A3与A6间人流较少&#xff0c;其它景点之间人流量大&#xff0c;必须投资铺设空调管道&#xff0c;但要求空调管道间不能交…