xxjob代碼執行過程
文章目錄
- 簡介
- xxl-job定時任務的種類
- xxl-job相關的數據表
- 調度服務
- JobScheduleHelper.start().run任務執行列表獲取
- 執行任務
- 執行器
- 執行方法
- 參考資料
簡介
-
調度中心:
負責管理調度信息,按照調度配置發出調度請求,自身不承擔業務代碼。調度系統與任務解耦,提高了系統可用性和穩定性,同時調度系統性能不再受限于任務模塊;
支持可視化、簡單且動態的管理調度信息,包括任務新建,更新,刪除,GLUE開發和任務報警等,所有上述操作都會實時生效,同時支持監控調度結果以及執行日志,支持執行器Failover,支持創建執行器等功能。 -
執行模塊(執行器):
負責接收調度請求并執行任務邏輯。任務模塊專注于任務的執行等操作,開發和維護更加簡單和高效;接收“調度中心”的執行請求、終止請求和日志請求等。
特性
xxl-job的特性有很多,官網上有詳細的介紹,這里我會介紹幾個重要的特性:
-
不分片:使用數據庫鎖在集群模式下同一時刻只有一個調度中心處理任務調度
-
簡單:支持通過Web頁面對任務進行CRUD操作,操作簡單,一分鐘上手;
動態:支持動態修改任務狀態、啟動/停止任務,以及終止運行中的任務,都是即時生效的。 -
調度中心HA(中心式):調度采用中心式設計,“調度中心”自研調度組件并支持集群部署,可保證調度中心HA;
-
執行器HA(分布式):任務分布式執行,任務”執行器”支持集群部署,可保證任務執行HA;
-
調度過期策略:調度中心錯過調度時間的補償處理策略:包括:忽略,立即補償觸發一次等;
-
阻塞處理策略:調度過于密集執行器來不及處理時的處理策略,策略包括:單機串行(默認)、丟棄后續調度、覆蓋之前的調用。
-
任務超時控制:支持自定義任務超時時間,任務運行超時將會主動中斷任務;
最大的特點就是不分片,相對其他分布式任務組件,這個組件是最簡單解決方式同時又能保證調度任務的一致性
xxl-job定時任務的種類
xxll0job支持java、groovy、腳本(Shell、Python、PHP、NodeJs、PowerShell)的定時任務
xxl-job相關的數據表
xxl-job將任務信息以及日志信息持久化到數據表中,這個就保證了可以動態的添加刪除任務。
xxl_job_lock:任務調度鎖表,在線程查詢任務信息時會調用上鎖。
xxl_job_group:執行器信息表,維護任務執行器信息;
xxl_job_info:調度擴展信息表: 用于保存XXL-JOB調度任務的擴展信息,如任務分組、任務名、機器地址、執行器、執行入參和報警郵件等等;
xxl_job_log:調度日志表: 用于保存XXL-JOB任務調度的歷史信息,如調度結果、執行結果、調度入參、調度機器和執行器等等;
xxl_job_log_report:調度日志報表:用戶存儲XXL-JOB任務調度日志的報表,調度中心報表功能頁面會用到;
xxl_job_logglue:任務GLUE日志:用于保存GLUE更新歷史,用于支持GLUE的版本回溯功能;
xxl_job_registry:執行器注冊表,維護在線的執行器和調度中心機器地址信息;
xxl_job_user:系統用戶表;
調度服務
- xxl-job集群部署時,如何避免多個服務器同時調度任務?
- 定時任務是如何實現的?
JobScheduleHelper.start().run任務執行列表獲取
JobScheduleHelper.start().run()使用while()循環不斷的查詢數據庫中<5秒的任務列表
public class JobScheduleHelper {public static final long PRE_READ_MS = 5000; // pre readprivate Thread scheduleThread;private Thread ringThread;private volatile boolean scheduleThreadToStop = false;private volatile boolean ringThreadToStop = false;//時間輪private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();public void start(){//掃描任務列表線程scheduleThread = new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");//循環掃描任務:重點部分while (!scheduleThreadToStop) {try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();//關閉自動提交conn.setAutoCommit(false);//獲取數據庫鎖preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// 1、pre readlong nowTime = System.currentTimeMillis();//關鍵部分,預讀出 當前時間+5s(PRE_READ_MS=5s) 的所有觸發任務的時間小于這個時間預讀出時間的任務List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、push time-ring// time-ring jump//遍歷所有任務,過濾出所有過期任務//過期任務的判斷方式 當前時間>任務下一次觸發時間+空閑間隔周期5s (在這段時間內任務還沒觸發的話說明任務超時過期了,既在5s的誤差內還沒出發任務),并按照策略執行if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());//2.1-1、misfire match 查詢當前任務的過期策略MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW策略則過期立即執行一次JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );}// 2.1-2、fresh 刷新下次執行時間到數據庫中refreshNextValidTime(jobInfo, new Date());//否則不是過期任務 任務下一次觸發時間+空閑間隔周期5s>當前時間>下一次任務觸發時間 (在允許的誤差之內都不算過期任務)} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time// 2.2-1、triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );// 2.1-2、fresh 刷新下次執行時間到數據庫中refreshNextValidTime(jobInfo, new Date());// 任務狀態有效&&執行時間在5秒以內的任務放入時間環中if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1.獲取剩余秒數int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、把剩余秒數作為key放入時間輪中pushTimeRing(ringSecond, jobInfo.getId());// 3、刷新任務的下次執行時間refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3 即將促發的任務放入時間輪中5秒鐘// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger infofor (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}} finally {// 最后釋放數據庫鎖,重置自動提交,關閉連接if (conn != null) {try {conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatement}long cost = System.currentTimeMillis()-start;}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();//執行任務線程:從時間輪中獲取任務并執行ringThread = new Thread(new Runnable() {@Overridepublic void run() {//休息到下一秒=一秒-當前毫秒=剩下的毫秒數:只是啟動時執行一次,以確保while是整秒執行try {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}//輪詢時間輪while (!ringThreadToStop) {try {synchronized (ringData){if(ringData.isEmpty()){ringData.wait();}}// 處理前2秒的數據List<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND); //當前秒數 // 避免處理耗時太長,跨過刻度,向前校驗一個刻度=移除前一個刻度的k-v值并jobId放入執行列表for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}// 促發執行任務logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );if (ringItemData.size() > 0) {// do triggerfor (int jobId: ringItemData) {// 執行任務JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// 清空需要執行任務的列表ringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}//以上內容執行完可能還沒到達下一秒鐘整點時間,那就休息到到達下一秒整點時的毫秒數時間try {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();}//start()方法結束}
任務調度器工作流程:
1.任務線程scheduleThread通過數據庫的for update行鎖來保證多個調度中心集群在同一時間內只有一個調度中心在調度任務
2.周期性的遍歷所有的jobInfo這個表,讀取觸發時間小于nowtime+5s這個時間之前的所有任務,然后進行引入以下觸發機制判斷
三種觸發任務機制:
1)過了執行時間5S多還沒執行的nowtime-TriggerNextTime()>PRE_READ_MS(5s) 既超過有誤差5S外的,則查看當前任務的失效調度策略,若為立即重試一次,則立即觸發調度任務,且觸發類型為misfire
2)過了執行時間5S內還沒執行的nowtime-TriggerNextTime()<PRE_READ_MS(5s) 既沒有超過有效誤差5S內,則立即調度調度任務
3)執行時間即將來5S內(因為他列表查的就是將來5S內的)還沒執行的nowtime<TriggerNextTime() 則說明這個任務馬上就要觸發了,放到一個時間輪上(https://blog.csdn.net/zalu9810/article/details/113396131),
3.隨后將快要觸發的任務放到時間輪上(Map<int,List>結構實現固定60個key),時間輪由key(將要觸發的時間s),value(在當前觸發s的所有任務id集合),然后更新這個任務的下一次觸發時間到數據庫中
4.第二個線程ringThread對這個時間輪的任務遍歷,周期在1s之內周期的掃描這個時間輪,然后執行調度任務
原文鏈接:https://blog.csdn.net/s6056826a/article/details/113446126
xxl-job通過mysql悲觀鎖實現分布式鎖,從而避免多個服務器同時調度任務:
- 通過setAutoCommit(false),關閉自動提交
- 通過select lock for update語句,其他事務無法獲取到鎖,顯示排她鎖。
- 最后在finally塊中commit()提交事務釋放for update的排他鎖,并且setAutoCommit
執行任務
在上面的調度任務中最終調用的是執行器執行任務方法,具體代碼如下
//在調度代碼中我們看到:執行任務
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);//JobTriggerPoolHelper.trigger方法最終調在線程池中調用的是XxlJobTrigger.trigger
JobTriggerPoolHelper.trigger(){
ThreadPoolExecutor triggerPool_ = fastTriggerPool;triggerPool_.execute(new Runnable() {//構建參數后執行 XxlJobTrigger.triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);}
}//XxlJobTrigger.trigger
XxlJobTrigger.trigger(){// load data,加載任務信息XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (executorParam != null) {jobInfo.setExecutorParam(executorParam);}processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}//核心邏輯在processTrigger中
//代碼位置: com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){//初始化trigger-paramTriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());
//執行路由策略:index\分片\路由策略\獲取任務地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
//運行執行器triggerResult = runExecutor(triggerParam, address);//日志處理,代碼省略
}//代碼位置:com.xxl.job.admin.core.trigger.XxlJobTrigger#runExecutor
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {//獲取執行器:通過地址從executorBizRepository(map)獲取ExecutorBizExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//執行任務并返回結果:executorBiz是抽象類,根據具體類型執行本地任務/執行器任務runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}//返回執行結果StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;
}
ExecutorBiz接口有兩個實現,分別是ExecutorBizClient(執行器客戶端)、ExecutorBizImpl(執行器服務端),
-
ExecutorBizClien類就是客戶端操作遠程服務的任務
-
ExecutorBizImpl就是服務端操作本地任務
ExecutorBiz接口有beat(心跳檢測)、idleBeat(空閑檢測)、run(執行任務)、kill(停止任務)、log(打印日志)這些方法。
我們看看ExecutorBizClien的run方法:
//代碼位置:com.xxl.job.core.biz.client.ExecutorBizClient#run
public ReturnT<String> run(TriggerParam triggerParam) {
//調用http的POST請求發送觸發參數觸發服務端的任務執行,然后將結果返回給客戶端。請求的地址為addressUrl + "run",當客戶端發送請求以后,ExecutorBizImpl的run方法將會接收請求處理,然后將處理的結果返回return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
客戶端EmbedServer類的內部類EmbedHttpServerHandler的process方法會調用ExecutorBizImpl類的run方法通過Netty Http調用執行器的EmbedServer的process方法
執行器
根據提供的樣例,可以發布為單獨的服務,和調度器分開部署。
執行器啟動時,會初始化一個EmbedServer類,該類的start方法會啟動netty服務器。netty服務器會接收客戶端發送過來的http請求,當接收到觸發請求(請求路徑是/run)會交給EmbedServer類的process方法處理,process方法將會調用ExecutorBizImpl的run方法處理客戶端發送的觸發請求,process方法接收觸發請求的代碼如下圖所示:
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq){if(){//....
}else if ("/run".equals(uri)) {TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);}}
ExecutorBizImpl的run方法處理流程大致如下:
- 加載任務處理器與任務執行線程,校驗任務處理器與任務執行線程。
- 執行阻塞策略
- 注冊任務
- 保存觸發參數到緩存
run方法首先根據任務id從緩存jobThreadRepository(map)中獲取任務執行線程jobThread,任務執行線程jobThread保存著任務處理器jobHandler,然后進行校驗任務執行線程以及任務處理器。
接下來檢驗任務執行線程以及任務處理器,就是按照Java、groovy、腳本分別進行校驗
xxl-job有三種阻塞策略,分別為SERIAL_EXECUTION(并行)、DISCARD_LATER(丟棄)、COVER_EARLY(覆蓋之前的)。當阻塞策略為丟棄,則判斷該執行線程是否正在執行,如果是則直接返回結果,不再往下執行任務了。當阻塞策略為覆蓋之前的,則判斷執行線程是否正在執行,如果是則殺掉原來的執行線程。如果阻塞策略是并行,則不做什么
注冊任務線程:如果任務線程等于null,注冊任務線程并啟動線程。registJobThread方法首先新建一個任務線程,并調用newJobThread的start方法啟動任務線程。然后加入jobThreadRepository進行緩存,當舊的oldJobThread不等于null,則停止掉舊的任務線程。
在服務端執行的流程中,將任務交給任務線程池JobThread執行,JobThread的run方法主要做了幾件事:
- 處理器的初始化
- 任務的執行
- 銷毀清理工作
while()循環從triggerQueue隊列中彈出觸發參數,如果存在執行超時時間并大于0,則在規定的時間異步調用handler的execute方法執行任務,否則立即調用handler的execute方法執行任務。
while()循環如果任務停止了,需要將隊列中所有的觸發刪除(所有定時任務刪除)
執行方法
定時任務執行過程調用處理器的init(初始化方法)、execute(執行方法)、destroy(銷毀)方法,這些方法是由IJobHandler抽象類的實現類實現的:
IJobHandler抽象類有三個子類,
-
GlueJobHandler是執行groovy的處理器,MethodJobHandler是執行java的處理器
-
ScriptJobHandler是執行腳本(Pyhotn、PHP、NodeJS、Shell等)的處理器。
-
MethodJobHandler是處理java定時任務的方法,當我們用java開發了定時任務方法,然后用@XxlJob注解修飾方法,就可以調度該定時任務方法了
我們看看MethodJobHandler的execute方法是如何執行定時任務方法的。
//代碼位置:com.xxl.job.core.handler.impl.MethodJobHandler#execute
public void execute() throws Exception {Class<?>[] paramTypes = method.getParameterTypes();if (paramTypes.length > 0) {method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types} else {method.invoke(target);}
}
MethodJobHandler的execute方法利用反射,獲取定時任務的method,然后利用invoke執行定時任務方法。
GlueJobHandler是執行groovy的處理器,在admin界面的idea界面上寫好groovy保存在數據庫,會調用GlueJobHandler類的execute方法執行,groovy是一種基于JVM的開發語言,groovy 代碼能夠與 Java 代碼很好地結合,也能用于擴展現有代碼。GlueFactory類的loadNewInstance方法將寫好的groovy加載解析為寫好的groovy代碼,并返回IJobHandler,然后將IJobHandler傳進GlueJobHandler構造器中新建GlueJobHandler對象。
GlueJobHandler的execute方法就是調用GlueFactory工廠類創建的IJobHandler的execute方法。
ScriptJobHandler的execute方法有些不重要的代碼被省略。主要有幾個重要的流程:獲取腳本執行命令、保存腳本命令到文件、執行腳本。
ScriptUtil的markScriptFile方法,將腳本定時任務代碼保存在名字為jobId_glueUpdatetime_suffix中,jobId為任務id,glueUpdatetime為腳本更新時間、suffix為腳本后綴,如采用python寫定時任務時,保存在類似666-123456789.py文件中。
執行腳本ScriptUtil的execToFile方法,execToFile方法是利用Runtime.getRuntime().exec()方法在java程序里運行腳本程序。Runtime.getRuntime().exec()方法會將執行命令發送給操作系統,然后等待操作系統運行程序的結果返回,如Runtime.getRuntime().exec()方法給操作系統發送pyhton hello.py命令,這樣就會執行python腳本,并等待python腳本的運行結果的返回
這里直接參考:xxl-job定時任務執行流程分析-任務執行
https://zhuanlan.zhihu.com/p/438464389
參考資料
xxl-job源碼(一)服務端客戶端簡單理解 - QiaoZhi - 博客園
萬字長文簡單明了的介紹xxl-job以及quartz 萬字長文簡單明了的介紹xxl-job以及quartz_wx613dbd09b1332的技術博客_51CTO博客
本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處:https://dhexx.cn/hk/4627869.html
如若內容造成侵權/違法違規/事實不符,請聯系我的編程經驗分享網進行投訴反饋,一經查實,立即刪除!