教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

Redis如何實現(xiàn)分布式阻塞隊列?

更新時間:2020年11月04日17時32分 來源:傳智播客 瀏覽次數(shù):

好口碑IT培訓(xùn)

1. Redis分布式鎖實現(xiàn)原理

分布式鎖本質(zhì)上要實現(xiàn)的目標(biāo)就是在Redis里面占一個“茅坑”,當(dāng)別的進程也要來占時,發(fā)現(xiàn)已經(jīng)有人蹲在那里了,就只好放棄或者稍后再試。占坑一般是使用setnx(set if not exists)指令,只允許被一個客戶端占坑。先來先占,用完了,再調(diào)用del指令釋放茅坑。

死鎖問題:如果邏輯執(zhí)行到中間出現(xiàn)異常了,可能會導(dǎo)致del指令沒有被調(diào)用,這樣就會陷入死鎖,鎖永遠得不到釋放,解決這個問題我們在拿到鎖之后,再給鎖加上一個過期時間,比如 5s,這樣即使中間出現(xiàn)異常也可以保證 5 秒之后鎖會自動釋放。

2. 普通非阻塞鎖實現(xiàn)

public class RedisLock {
	private Jedis jedis;
		public RedisLock(Jedis jedis) {
			this.jedis = jedis;
		}
	public boolean lock(String key) {
		return jedis.set(key, "", "nx", "ex", 5L) != null;
	}
	public void unlock(String key) {
		jedis.del(key);
	}
}

2.1 存在問題

如果某一個進程沒有拿到鎖得到了false的結(jié)果那么次進程是否執(zhí)行當(dāng)前任務(wù)?顯然對于一般情況來說我們的任務(wù)都是必須執(zhí)行的那么此時我們就要考慮該何時執(zhí)行了,在傳統(tǒng)的鎖中我們?nèi)绻麤]有拿到鎖線程就進入了阻塞狀態(tài)那么此處我們是否可以改進同樣實現(xiàn)阻塞喚醒機制。

3. 分布式阻塞鎖具體實現(xiàn)

3.1 解決思路

(1)首先我們改造lock鎖,當(dāng)不能創(chuàng)建key時就利用當(dāng)前key阻塞當(dāng)前線程

(2)當(dāng)某一個線程釋放鎖時通過redis的pub/sub發(fā)送一個消息消息內(nèi)容為key

(3)所有使用鎖的應(yīng)用監(jiān)聽lock通道的消息,在收到消息時通過key喚醒對應(yīng)線程

3.2具體實現(xiàn)

package com.hgy.common.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import java.util.HashMap;

public class RedisLock extends JedisPubSub {
	//是否已經(jīng)初始化監(jiān)聽
	private static volatile boolean isListen = false;

	//每一個redis的key對應(yīng)一個阻塞對象
	private HashMap blockers = new HashMap<>();

	private Jedis jedis;

	//當(dāng)前獲得鎖的線程
	private Thread curThread;

	public RedisLock(Jedis jedis) {
		this.jedis = jedis;
		//保證沒一個應(yīng)用只初始化一次監(jiān)聽
		if (!isListen) {
			synchronized (RedisLock.class) {
				if (!isListen) {
					// 啟動一個線程做消息監(jiān)聽
					new Thread(()->{
						new Jedis("192.168.200.128", 6379).subscribe(this,"lock");
					}).start();
					isListen = true;
				}
			}
		}
	}
	public void lock(String key) throws InterruptedException {
		//循環(huán)判斷是否能夠創(chuàng)建key, 不能則直接wait釋放CPU執(zhí)行權(quán)
		while (jedis.set(key, "", "nx", "ex", 20L) == null) {
			synchronized (key) {
				System.out.println(Thread.currentThread().getName() + "======="+ key);
				blockers.put(key, key);
				key.wait();
			}
		}
		blockers.put(key, key);
		//能夠成功創(chuàng)建,獲取鎖成功記錄當(dāng)前獲取鎖線程
		curThread = Thread.currentThread();
	}

	public void unlock(String key) {
		//判斷是否為加鎖的線程執(zhí)行解鎖, 不是則直接忽略
		if( curThread == Thread.currentThread()) {
			jedis.del(key);
			//刪除key之后需要notifyAll所有的應(yīng)用, 所以這里采用發(fā)訂閱消息給所有的應(yīng)用
			jedis.publish("lock", key);
		}
	}

	/**
	* 所有應(yīng)用接收到消息后在當(dāng)前應(yīng)用中執(zhí)行對應(yīng)key的notifyAll方法
	* @param channel
	* @param message
	*/

	@Override
	public void onMessage(String channel, String message) {
		Object lock = blockers.get(message);
		if(lock != null) {
			synchronized (lock) {
				lock.notifyAll();
			}
		}
	}
}

4.測試

目標(biāo): 開啟兩個mian線程, 在第一個中首先暫停3秒然后打印1-100然后線程休眠5秒釋放鎖并打印最后的毫秒數(shù);main1在執(zhí)行的同時執(zhí)行main2,在2中打印開始時間;最后比對1和2的開始時間即可驗

證。

注意: 先啟動1然后啟動2

·main1

package com.hgy;
import com.hgy.common.redis.RedisLock;
import redis.clients.jedis.Jedis;
public class RedisLockApp1 {
	private static RedisLock redisLock;
	public static void main(String[] args) throws InterruptedException {
		Jedis client = new Jedis("192.168.200.128", 6379);
		redisLock = new RedisLock(client);
		redisLock.lock("demo");
		Thread.sleep(3000);
		for (int i = 0; i < 100; i++) {
			System.out.println("app1" + i);
		}
		Thread.sleep(5000);
		redisLock.unlock("demo");
		System.out.println("App1==> end:" + System.currentTimeMillis());
	}
}

·main2

package com.hgy;

import com.hgy.common.redis.RedisLock;
import redis.clients.jedis.Jedis;

public class RedisLockApp2 {
	private static RedisLock redisLock;
	public static void main(String[] args) throws InterruptedException {
		Jedis client = new Jedis("192.168.200.128", 6379);
		redisLock = new RedisLock(client);
		redisLock.lock("demo");
		System.out.println("App2==> start:" + System.currentTimeMillis());
		for (int i = 0; i < 100; i++) {
			System.out.println("app2" + i);
		}
		redisLock.unlock("demo");
	}
}

注意

如果細心的小伙伴兒可能已經(jīng)發(fā)現(xiàn)了unlock其實不是一個原子操作,可能在未發(fā)布消息但刪除key之后的這段時間如果有人此時執(zhí)行l(wèi)ock那么可以直接拿到鎖;但是影響不大因為拿到鎖之后其他被阻塞的線程被喚醒之后將會繼續(xù)阻塞。

猜你喜歡

redis生成自增長ID教程

Redis持久化詳細介紹

0 分享到:
和我們在線交談!