[case46]聊聊storm trident spout的_maxTransactionActive

news/2024/10/3 19:53:53

本文主要研究一下storm trident spout的_maxTransactionActive

MasterBatchCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java

TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);for(String spoutId: _managedSpoutIds) {_states.add(TransactionalState.newCoordinatorState(conf, spoutId));}_currTransaction = getStoredCurrTransaction();_collector = collector;Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);if(active==null) {_maxTransactionActive = 1;} else {_maxTransactionActive = active.intValue();}_attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);for(int i=0; i<_spouts.size(); i++) {String txId = _managedSpoutIds.get(i);_coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));}LOG.debug("Opened {}", this);}public void nextTuple() {sync();}private void sync() {// note that sometimes the tuples active may be less than max_spout_pending, e.g.// max_spout_pending = 3// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),// and there won't be a batch for tx 4 because there's max_spout_pending tx activeTransactionStatus maybeCommit = _activeTx.get(_currTransaction);if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {maybeCommit.status = AttemptStatus.COMMITTING;_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);}if(_active) {if(_activeTx.size() < _maxTransactionActive) {Long curr = _currTransaction;for(int i=0; i<_maxTransactionActive; i++) {if(!_activeTx.containsKey(curr) && isReady(curr)) {// by using a monotonically increasing attempt id, downstream tasks// can be memory efficient by clearing out state for old attempts// as soon as they see a higher attempt id for a transactionInteger attemptId = _attemptIds.get(curr);if(attemptId==null) {attemptId = 0;} else {attemptId++;}_attemptIds.put(curr, attemptId);for(TransactionalState state: _states) {state.setData(CURRENT_ATTEMPTS, _attemptIds);}TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);_activeTx.put(curr, newTransactionStatus);_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);_throttler.markEvent();}curr = nextTransactionId(curr);}}}}private static class TransactionStatus {TransactionAttempt attempt;AttemptStatus status;public TransactionStatus(TransactionAttempt attempt) {this.attempt = attempt;this.status = AttemptStatus.PROCESSING;}@Overridepublic String toString() {return attempt.toString() + " <" + status.toString() + ">";}        }private static enum AttemptStatus {PROCESSING,PROCESSED,COMMITTING}
  • MasterBatchCoordinator在open方法对_maxTransactionActive进行设置,从Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending),配置文件默认为null,这里在该值为null时设置_maxTransactionActive为1
  • nextTuple这里对同时处理的batches的数量进行了控制,只有_activeTx中的batches处理成功或失败之后才能继续下一个batch
  • _activeTx是一个treeMap,它以transactionId为key,value是TransactionStatus,它里头包含了TransactionAttempt及AttemptStatus;AttemptStatus有三种状态,分别是PROCESSING、PROCESSED、COMMITTING

TridentSpoutCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java

    RotatingTransactionalState _state;public void prepare(Map conf, TopologyContext context) {_coord = _spout.getCoordinator(_id, conf, context);_underlyingState = TransactionalState.newCoordinatorState(conf, _id);_state = new RotatingTransactionalState(_underlyingState, META_DIR);}public void execute(Tuple tuple, BasicOutputCollector collector) {TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {_state.cleanupBefore(attempt.getTransactionId());_coord.success(attempt.getTransactionId());} else {long txid = attempt.getTransactionId();Object prevMeta = _state.getPreviousState(txid);Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));_state.overrideState(txid, meta);collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));}}
  • TridentSpoutCoordinator的execute方法按txid来存取meta,之后往TridentBoltExecutor发射Values(attempt, meta)

TridentBoltExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

RotatingMap<Object, TrackedBatch> _batches;public void execute(Tuple tuple) {if(TupleUtils.isTick(tuple)) {long now = System.currentTimeMillis();if(now - _lastRotate > _messageTimeoutMs) {_batches.rotate();_lastRotate = now;}return;}String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());if(batchGroup==null) {// this is so we can do things like have simple DRPC that doesn't need to use batch processing_coordCollector.setCurrBatch(null);_bolt.execute(null, tuple);_collector.ack(tuple);return;}IBatchID id = (IBatchID) tuple.getValue(0);//get transaction id//if it already exists and attempt id is greater than the attempt thereTrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id + 
//                    "\nwith group " + batchGroup
//                    + "\n");
//            
//        }//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());// this code here ensures that only one attempt is ever tracked for a batch, so when// failures happen you don't get an explosion in memory usage in the tasksif(tracked!=null) {if(id.getAttemptId() > tracked.attemptId) {_batches.remove(id.getId());tracked = null;} else if(id.getAttemptId() < tracked.attemptId) {// no reason to try to execute a previous attempt than we've already seenreturn;}}if(tracked==null) {tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());_batches.put(id.getId(), tracked);}_coordCollector.setCurrBatch(tracked);//System.out.println("TRACKED: " + tracked + " " + tuple);TupleType t = getTupleType(tuple, tracked);if(t==TupleType.COMMIT) {tracked.receivedCommit = true;checkFinish(tracked, tuple, t);} else if(t==TupleType.COORD) {int count = tuple.getInteger(1);tracked.reportedTasks++;tracked.expectedTupleCount+=count;checkFinish(tracked, tuple, t);} else {tracked.receivedTuples++;boolean success = true;try {_bolt.execute(tracked.info, tuple);if(tracked.condition.expectedTaskReports==0) {success = finishBatch(tracked, tuple);}} catch(FailedException e) {failBatch(tracked, e);}if(success) {_collector.ack(tuple);                   } else {_collector.fail(tuple);}}_coordCollector.setCurrBatch(null);}public static class TrackedBatch {int attemptId;BatchInfo info;CoordCondition condition;int reportedTasks = 0;int expectedTupleCount = 0;int receivedTuples = 0;Map<Integer, Integer> taskEmittedTuples = new HashMap<>();boolean failed = false;boolean receivedCommit;Tuple delayedAck = null;public TrackedBatch(BatchInfo info, CoordCondition condition, int attemptId) {this.info = info;this.condition = condition;this.attemptId = attemptId;receivedCommit = condition.commitStream == null;}@Overridepublic String toString() {return ToStringBuilder.reflectionToString(this);}        }private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {if(tracked.failed) {failBatch(tracked);_collector.fail(tuple);return;}CoordCondition cond = tracked.condition;boolean delayed = tracked.delayedAck==null &&(cond.commitStream!=null && type==TupleType.COMMIT|| cond.commitStream==null);if(delayed) {tracked.delayedAck = tuple;}boolean failed = false;if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {if(tracked.receivedTuples == tracked.expectedTupleCount) {finishBatch(tracked, tuple);                } else {//TODO: add logging that not all tuples were receivedfailBatch(tracked);_collector.fail(tuple);failed = true;}}if(!delayed && !failed) {_collector.ack(tuple);}}private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {boolean success = true;try {_bolt.finishBatch(tracked.info);String stream = COORD_STREAM(tracked.info.batchGroup);for(Integer task: tracked.condition.targetTasks) {_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));}if(tracked.delayedAck!=null) {_collector.ack(tracked.delayedAck);tracked.delayedAck = null;}} catch(FailedException e) {failBatch(tracked, e);success = false;}_batches.remove(tracked.info.batchId.getId());return success;}private void failBatch(TrackedBatch tracked, FailedException e) {if(e!=null && e instanceof ReportedFailedException) {_collector.reportError(e);}tracked.failed = true;if(tracked.delayedAck!=null) {_collector.fail(tracked.delayedAck);tracked.delayedAck = null;}}
  • TridentBoltExecutor使用RotatingMap(_batches)来存放batch的信息,key为txid,而valute为TrackedBatch
  • 在调用_bolt.execute(tracked.info, tuple)方法时,传递了BatchInfo,它里头的state值为_bolt.initBatchState(batchGroup, id),通过_bolt的initBatchState得来的,这是在第一次_batches里头没有该txid信息的时候,第一次创建的时候调用
  • 这里的checkFinish也是根据batch对应的TrackedBatch信息来进行判断的;finishBatch的时候会调用_bolt.finishBatch(tracked.info),传递batchInfo过去;failBatch也是对batch对应的TrackedBatch进行操作

BatchInfo

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/BatchInfo.java

public class BatchInfo {public IBatchID batchId;public Object state;public String batchGroup;public BatchInfo(String batchGroup, IBatchID batchId, Object state) {this.batchGroup = batchGroup;this.batchId = batchId;this.state = state;}
}
  • BatchInfo里头包含了batchId,state以及batchGroup信息

TridentSpoutExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java

    public Object initBatchState(String batchGroup, Object batchId) {return null;}public void execute(BatchInfo info, Tuple input) {// there won't be a BatchInfo for the success streamTransactionAttempt attempt = (TransactionAttempt) input.getValue(0);if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);_activeBatches.remove(attempt.getTransactionId());} else {throw new FailedException("Received commit for different transaction attempt");}} else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {// valid to delete before what's been committed since // those batches will never be accessed again_activeBatches.headMap(attempt.getTransactionId()).clear();_emitter.success(attempt);} else {            _collector.setBatch(info.batchId);_emitter.emitBatch(attempt, input.getValue(1), _collector);_activeBatches.put(attempt.getTransactionId(), attempt);}}public void finishBatch(BatchInfo batchInfo) {}
  • TridentSpoutExecutor的execute方法,也是根据txid来区分各自batch的信息

SubtopologyBolt

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java

    public Object initBatchState(String batchGroup, Object batchId) {ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {p.startBatch(ret);}return ret;}public void execute(BatchInfo batchInfo, Tuple tuple) {String sourceStream = tuple.getSourceStreamId();InitialReceiver ir = _roots.get(sourceStream);if(ir==null) {throw new RuntimeException("Received unexpected tuple " + tuple.toString());}ir.receive((ProcessorContext) batchInfo.state, tuple);}public void finishBatch(BatchInfo batchInfo) {for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {p.finishBatch((ProcessorContext) batchInfo.state);}}protected static class InitialReceiver {List<TridentProcessor> _receivers = new ArrayList<>();RootFactory _factory;ProjectionFactory _project;String _stream;public InitialReceiver(String stream, Fields allFields) {// TODO: don't want to project for non-batch bolts...???// how to distinguish "batch" streams from non-batch streams?_stream = stream;_factory = new RootFactory(allFields);List<String> projected = new ArrayList<>(allFields.toList());projected.remove(0);_project = new ProjectionFactory(_factory, new Fields(projected));}public void receive(ProcessorContext context, Tuple tuple) {TridentTuple t = _project.create(_factory.create(tuple));for(TridentProcessor r: _receivers) {r.execute(context, _stream, t);}            }public void addReceiver(TridentProcessor p) {_receivers.add(p);}public Factory getOutputFactory() {return _project;}}
  • SubtopologyBolt在initBatchState的时候,创建ProcessorContext的也是带有batchId的标识,这样子不同的batch并行的话,它们的ProcessorContext也是区分开来的
  • execute方法使用的是各自batch的ProcessorContext(batchInfo.state),调用TridentProcessor的execute方法,使用的是各自batch的ProcessorContext
  • finishBatch方法也一样,将(ProcessorContext) batchInfo.state传递给TridentProcessor.finishBatch

AggregateProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AggregateProcessor.java

    public void startBatch(ProcessorContext processorContext) {_collector.setContext(processorContext);processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);}    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {_collector.setContext(processorContext);_agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);}public void finishBatch(ProcessorContext processorContext) {_collector.setContext(processorContext);_agg.complete(processorContext.state[_context.getStateIndex()], _collector);}
  • AggregateProcessor的startBatch、execute、finishBatch方法都使用了ProcessorContext的state,而该ProcessorContext从SubtopologyBolt传递过来的就是区分batch的

EachProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/EachProcessor.java

    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {List<Factory> parents = tridentContext.getParentTupleFactories();if(parents.size()!=1) {throw new RuntimeException("Each operation can only have one parent");}_context = tridentContext;_collector = new AppendCollector(tridentContext);_projection = new ProjectionFactory(parents.get(0), _inputFields);_function.prepare(conf, new TridentOperationContext(context, _projection));}public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {_collector.setContext(processorContext, tuple);_function.execute(_projection.create(tuple), _collector);}public void startBatch(ProcessorContext processorContext) {}public void finishBatch(ProcessorContext processorContext) {}
  • EachProcessor则是将ProcessorContext设置到_collector,然后调用_function.execute的时候,将_collector传递过去;这里的_collector为AppendCollector

AppendCollector

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AppendCollector.java

public class AppendCollector implements TridentCollector {OperationOutputFactory _factory;TridentContext _triContext;TridentTuple tuple;ProcessorContext context;public AppendCollector(TridentContext context) {_triContext = context;_factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());}public void setContext(ProcessorContext pc, TridentTuple t) {this.context = pc;this.tuple = t;}@Overridepublic void emit(List<Object> values) {TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);for(TupleReceiver r: _triContext.getReceivers()) {r.execute(context, _triContext.getOutStreamId(), toEmit);}}@Overridepublic void reportError(Throwable t) {_triContext.getDelegateCollector().reportError(t);} public Factory getOutputFactory() {return _factory;}
}
  • 当_function.execute使用AppendCollector进行emit的时候,AppendCollector会将这些tuple交给TupleReceiver去处理,而传递过去的context为EachProcessor设置的ProcessorContext,即每个batch自己的ProcessorContext;TupleReceiver的execute方法可能对ProcessorContext进行存取,这个也是batch维度的,比如AggregateProcessor将聚合结果存放到自己batch的processorContext.state中

小结

  • storm的trident使用[id,count]数据来告诉下游的TridentBoltExecutor来结束一个batch;而TridentBoltExecutor在接收[id,count]数据的时候,会先判断tracked.reportedTasks是否等于cond.expectedTaskReports(这个在上游的TridentBoltExecutor的parallelism大于1的时候用来聚合这些task的数据),相等之后再判断tracked.receivedTuples是否等于tracked.expectedTupleCount,相等才能进行finishBatch
  • storm的trident spout的_maxTransactionActive参数根据Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending)进行设置,配置文件默认为null,在该值为null时_maxTransactionActive为1
  • MasterBatchCoordinator对同时处理的batches的数量进行了控制,只有_activeTx中的batches处理成功或失败之后才能继续下一个batch;而当并行有多个_activeTx的时候,下游的TridentBoltExecutor也能够区分batch来进行处理,不会造成混乱;比如SubtopologyBolt在initBatchState的时候,创建ProcessorContext的也是带有batchId的标识,这样子不同的batch并行的话,它们的ProcessorContext也是区分开来的;SubtopologyBolt里头调用的TridentProcessor有的会使用ProcessorContext来存储结果,比如AggregateProcessor将聚合结果存放到自己batch的processorContext.state中

doc

  • 聊聊storm的maxSpoutPending
  • 聊聊storm WindowTridentProcessor的FreshCollector
  • 聊聊storm TridentBoltExecutor的finishBatch方法
  • 聊聊storm的AggregateProcessor的execute及finishBatch方法
  • 聊聊storm trident batch的分流与聚合

https://dhexx.cn/news/show-2611991.html

相关文章

机器学习必知概念:贝叶斯估计、最大似然估计、最大后验估计

原文(我的博客):贝叶斯估计、最大似然估计、最大后验估计三者的区别 更多机器学习深度学习资源 实例分析 即使学过机器学习的人&#xff0c;对机器学习中的 MLE(极大似然估计)、MAP(最大后验估计)以及贝叶斯估计(Bayesian) 仍有可能一知半解。对于一个基础模型&#xff0c;通常…

需求工程知识点

对数据流图进行细化 状态转换图 控制类 衔接各类 同时完成业务逻辑的处理 动词名词组成 如 身份验证 uml图分类

Prometheus + Grafana 监控 Linux 和 MySQL 安装配置

2019独角兽企业重金招聘Python工程师标准>>> 一、介绍Prometheus Prometheus&#xff08;普罗米修斯&#xff09;是一套开源的监控&报警&时间序列数据库的组合&#xff0c;起始是由SoundCloud公司开发的。随着发展&#xff0c;越来越多公司和组织接受采用Pr…

2018服务端架构师技术图谱

本文摘自 github 上的一篇长约 10 万字服务端架构师技术总结归纳文档&#xff0c;覆盖广度包括数据结构、算法、并发、操作系统、设计模式、运维、中间件、网络、数据库、搜索引擎、性能、大数据、安全、常见开源框架、分布式、设计思想、项目管理和技术资源等。 目录 数据结构…

Spring AOP(一) AOP基本概念

Spring框架自诞生之日就拯救我等程序员于水火之中&#xff0c;它有两大法宝&#xff0c;一个是IoC控制反转&#xff0c;另一个便是AOP面向切面编程。今日我们就来破一下它的AOP法宝&#xff0c;以便以后也能自由使出一手AOP大法。 AOP全名Aspect-oriented programming面向切面编…

windows电脑如何解决arp欺骗

写好静态绑定命令 arp -s 192.168.6.245 f4-9e-7c-ec-6a-23 arp -s 192.168.6.1 56-8d-4a-4c-8b-73 开机执行命令。 打开组管理策略 讲写的bat脚本放进去 双击启动 添加脚本即可 提前将写好的staticMac.bat脚本放在 C:\Windows\System32\GroupPolicy\Machine\Scripts\Startup目…

(待解决,效率低下)47. Permutations II C++回溯法

思路是在相似题Permutations的基础上&#xff0c;将结果放到set中&#xff0c;利用set容器不会出现重复元素的特性&#xff0c;得到所需结果 但是利用代码中的/* */部分通过迭代器遍历set将set中的元素放在一个新的vector中时&#xff0c;会出现memory limit exceeded错误&…

jsp、freemarker、velocity、thymeleaf

1、概述在java领域&#xff0c;表现层技术主要有三种&#xff0c; &#xff08;1&#xff09;jsp; &#xff08;2&#xff09;freemarker; &#xff08;3&#xff09;velocity; &#xff08;4&#xff09;thymeleaf; 2、jsp优点&#xff1a; 1、功能强大&#xff0c;可以写java…

iconfont 在项目中的简单使用

font-class引用 font-class是unicode使用方式的一种变种&#xff0c;主要是解决unicode书写不直观&#xff0c;语意不明确的问题。 与unicode使用方式相比&#xff0c;具有如下特点&#xff1a; 兼容性良好&#xff0c;支持ie8&#xff0c;及所有现代浏览器。相比于unicode语意…

分享一些好用的网站

前言 这两年收藏了不少网站&#xff0c;特地整理一下&#xff0c;把一些大家都可能用得上的分享出来&#xff0c;希望能对你有用。 考虑到有一些网站大多数人都知道&#xff0c;所以我就不列出来了。 我把这些网站分为了几大类&#xff1a; 工具类素材类社区类工具类 1、start.…