更新時間:2023年06月21日18時41分 來源:傳智教育 瀏覽次數(shù):
延遲任務(wù)有固定周期有明確出發(fā)時間,而延遲隊列沒有固定的開始時間它常常是由一個事件觸發(fā)的,而在這個事件觸發(fā)之后的一段時間內(nèi)觸發(fā)另一個事件,任務(wù)可以立即執(zhí)行,也可以延遲。
延遲任務(wù)的應(yīng)用場景:
場景一:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統(tǒng)自動取消訂單;如果期間下單成功,任務(wù)取消
場景二:接口對接出現(xiàn)網(wǎng)絡(luò)問題,1分鐘后重試,如果失敗,2分鐘重試,直到出現(xiàn)閾值終止
實現(xiàn)延遲任務(wù)的兩種任務(wù)
1)DelayQueue
JDK自帶DelayQueue 是一個支持延時獲取元素的阻塞隊列, 內(nèi)部采用優(yōu)先隊列 PriorityQueue 存儲元素,同時元素必須實現(xiàn) Delayed 接口;在創(chuàng)建元素時可以指定多久才可以從隊列中獲取當(dāng)前元素,只有在延遲期滿時才能從隊列中提取元素
DelayQueue屬于排序隊列,它的特殊之處在于隊列的元素必須實現(xiàn)Delayed接口,該接口需要實現(xiàn)compareTo和getDelay方法
getDelay方法:獲取元素在隊列中的剩余時間,只有當(dāng)剩余時間為0時元素才可以出隊列。
compareTo方法:用于排序,確定元素出隊列的順序。
實現(xiàn):
1:在測試包jdk下創(chuàng)建延遲任務(wù)元素對象DelayedTask,實現(xiàn)compareTo和getDelay方法,
2:在main方法中創(chuàng)建DelayQueue并向延遲隊列中添加三個延遲任務(wù),
3:循環(huán)的從延遲隊列中拉取任務(wù)
public class DelayedTask implements Delayed{ // 任務(wù)的執(zhí)行時間 private int executeTime = 0; public DelayedTask(int delay){ Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND,delay); this.executeTime = (int)(calendar.getTimeInMillis() /1000 ); } /** * 元素在隊列中的剩余時間 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { Calendar calendar = Calendar.getInstance(); return executeTime - (calendar.getTimeInMillis()/1000); } /** * 元素排序 * @param o * @return */ @Override public int compareTo(Delayed o) { long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return val == 0 ? 0 : ( val < 0 ? -1: 1 ); } public static void main(String[] args) { DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); queue.add(new DelayedTask(5)); queue.add(new DelayedTask(10)); queue.add(new DelayedTask(15)); System.out.println(System.currentTimeMillis()/1000+" start consume "); while(queue.size() != 0){ DelayedTask delayedTask = queue.poll(); if(delayedTask !=null ){ System.out.println(System.currentTimeMillis()/1000+" cosume task"); } //每隔一秒消費一次 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
DelayQueue實現(xiàn)完成之后思考一個問題:
使用線程池或者原生DelayQueue程序掛掉之后,任務(wù)都是放在內(nèi)存,需要考慮未處理消息的丟失帶來的影響,如何保證數(shù)據(jù)不丟失,需要持久化(磁盤)
2)RabbitMQ實現(xiàn)延遲任務(wù)
TTL:Time To Live (消息存活時間)
死信隊列:Dead Letter Exchange(死信交換機),當(dāng)消息成為Dead message后,可以重新發(fā)送另一個交換機(死信交換機)。
3)redis實現(xiàn)
zset數(shù)據(jù)類型的去重有序(分?jǐn)?shù)排序)特點進行延遲。例如:時間戳作為score進行排序。
redis實現(xiàn)延遲任務(wù)的思路
1.為什么任務(wù)需要存儲在數(shù)據(jù)庫中?
延遲任務(wù)是一個通用的服務(wù),任何需要延遲得任務(wù)都可以調(diào)用該服務(wù),需要考慮數(shù)據(jù)持久化的問題,存儲數(shù)據(jù)庫中是一種數(shù)據(jù)安全的考慮。
2.為什么redis中使用兩種數(shù)據(jù)類型,list和zset?
效率問題,算法的時間復(fù)雜度
3.在添加zset數(shù)據(jù)的時候,為什么不需要預(yù)加載?
任務(wù)模塊是一個通用的模塊,項目中任何需要延遲隊列的地方,都可以調(diào)用這個接口,要考慮到數(shù)據(jù)量的問題,如果數(shù)據(jù)量特別大,為了防止阻塞,只需要把未來幾分鐘要執(zhí)行的數(shù)據(jù)存入緩存即可。
4)延遲任務(wù)服務(wù)實現(xiàn)
搭建heima-leadnews-schedule模塊
leadnews-schedule是一個通用的服務(wù),單獨創(chuàng)建模塊來管理任何類型的延遲任務(wù)
①:導(dǎo)入資料文件夾下的heima-leadnews-schedule模塊到heima-leadnews-service下,如下圖所示:
②:添加bootstrap.yml
server: port: 51701 spring: application: name: leadnews-schedule cloud: nacos: discovery: server-addr: 192.168.200.130:8848 config: server-addr: 192.168.200.130:8848 file-extension: yml
③:在nacos中添加對應(yīng)配置,并添加數(shù)據(jù)庫及mybatis-plus的配置
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC username: root password: root # 設(shè)置Mapper接口所對應(yīng)的XML文件位置,如果你在Mapper接口中有自定義方法,需要進行該配置 mybatis-plus: mapper-locations: classpath*:mapper/*.xml # 設(shè)置別名包掃描路徑,通過該屬性可以給包中的類注冊別名 type-aliases-package: com.heima.model.schedule.pojos