10 分钟精通 Redis 分布式锁中的各种门道
道阻且长,行则将至。请相信我,你一定会更优秀!
一个 setnx 就行了?value没意义?还有人认为 incr 也可以? 再加个超时时间就行了? 你写的分布式锁,你确认你敢投产吗?
3-1、放在 finally{} 块中就行了吗? 3-2、锁的超时时间该怎么计算? 3-3、加个超时时间就行了吗? 3-4、这个密语value设置成什么呢? 3-5、继续,现在把思维先跳出来,想想?可重入怎么搞? 3-6、能不侵入业务代码吗? 3-7、Thread-Id 真能行吗? 3-8、APP_ID + ThreadId 还是 UUID 好呢? 3-9、锁重入,就这么简单? 3-10、重入锁的方法中直接执行 unlock?这么大胆! 3-11、终于见到希望?再来一招! 3-12、搞了半天,锁还是崩溃了? 3-13、别急,还有。锁在我手里,我挂了,这...
public static void doSomething() {
// RedisLock是我封装的一个类,后面会讲到
RedisLock redisLock = new RedisLock(jedis); // 创建jedis实例的代码省略,不是重点
try {
redisLock.lock(); // 上锁
// 处理业务
System.out.println(Thread.currentThread().getName() + " 线程处理业务逻辑中...");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 线程处理业务逻辑完毕");
redisLock.unlock(); // 释放锁
} catch (Exception e) {
e.printStackTrace();
}
}
public static void doSomething() {
RedisLock redisLock = new RedisLock(jedis); // 创建jedis实例的代码省略,不是重点
try {
redisLock.lock(); // 上锁,必须在 try{}中
// 处理业务
System.out.println(Thread.currentThread().getName() + " 线程处理业务逻辑中...");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 线程处理业务逻辑完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock.unlock(); // 在finally{} 中释放锁
}
}
public static final String lock_key = "haolin-lock";
public void lock() {
while (!tryLock()) {
try {
Thread.sleep(50); // 在while中自旋,如果说读者想设置一些自旋次数,等待最大时长等自己去扩展,不是此处的重点
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("线程:" + threadName + ",占锁成功!★★★");
}
private boolean tryLock() {
SetParams setParams = new SetParams();
setParams.ex(1); // 超时时间1s
setParams.nx(); // nx
String response = jedis.set(lock_key, "", setParams); // 转换为redis命令就是:set haolin-key "" ex 1 nx
return "OK".equals(response);
}
// http://redis.io/commands/set.html
SET key value [EX seconds] [PX milliseconds] [NX|XX]
set k vexipre k time
private boolean tryLock() {
SetParams setParams = new SetParams();
setParams.ex(1); // 超时时间1s
setParams.nx(); // nx
String response = jedis.set(lock_key, "", setParams); // 转换为redis命令就是:set haolin-key "" ex 1 nx
return "OK".equals(response);
}
// 别的线程直接调用释放锁操作,分布式锁崩溃!
public void unlock() {
jedis.del(encode(lock_key));
System.out.println("线程:" + threadName + " 释放锁成功!☆☆☆");
}
private byte[] encode(String param) {
return param.getBytes();
}
String releaseLock_lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] \n" +
"then\n" +
" return redis.call(\"del\", KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
private boolean tryLock(String uuid) {
SetParams setParams = new SetParams();
setParams.ex(1); // 超时时间1s
setParams.nx(); // nx
String response = jedis.set(lock_key, uuid, setParams); // 转换为redis命令就是:set haolin-key "" ex 1 nx
return "OK".equals(response);
}
public void unlock(String uuid) {
List<byte[]> keys = Arrays.asList(encode(lock_key));
List<byte[]> args = Arrays.asList(encode(uuid));
// 使用lua脚本,保证原子性
long eval = (Long) jedis.eval(encode(releaseLock_lua), keys, args);
if (eval == 1) {
System.out.println("线程:" + threadName + " 释放锁成功!☆☆☆");
} else {
System.out.println("线程:" + threadName + " 释放锁失败!该线程未持有锁!!!");
}
}
private byte[] encode(String param) {
return param.getBytes();
}
get k // 进行秘钥 value的比对
del k // 比对成功后,删除k
String releaseLock_lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] \n" +
"then\n" +
" return redis.call(\"del\", KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
String addLockLife_lua = "if redis.call(\"exists\", KEYS[1]) == 1\n" +
"then\n" +
" return redis.call(\"expire\", KEYS[1], ARGV[1])\n" +
"else\n" +
" return 0\n" +
"end";
public void lock() {
// 判断是否可重入
if (isHeldByCurrentThread()) {
return;
}
while (!tryLock()) {
try {
Thread.sleep(50); // 自旋
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("线程:" + threadName + ",占锁成功!★★★");
}
// 是否是当前线程占有锁,同时将超时时间重新设置,这个很重要,同样也是原子操作
private boolean isHeldByCurrentThread() {
List<byte[]> keys = Arrays.asList(encode(lock_key));
List<byte[]> args = Arrays.asList(encode(String.valueOf(threadId)), encode(String.valueOf(1)));
long eval = (Long) jedis.eval(encode(addLockLife_lua), keys, args);
return eval == 1;
}
private boolean tryLock(String uuid) {
SetParams setParams = new SetParams();
setParams.ex(1); // 超时时间1s
setParams.nx(); // nx
String response = jedis.set(lock_key, String.valueOf(threadId), setParams); // 转换为redis命令就是:set haolin-key xxx ex 1 nx
return "OK".equals(response);
}
public void unlock(String uuid) {
List<byte[]> keys = Arrays.asList(encode(lock_key));
List<byte[]> args = Arrays.asList(encode(String.valueOf(threadId)));
// 使用lua脚本,保证原子性
long eval = (Long) jedis.eval(encode(releaseLock_lua), keys, args);
if (eval == 1) {
System.out.println("线程:" + threadName + " 释放锁成功!☆☆☆");
} else {
System.out.println("线程:" + threadName + " 释放锁失败!该线程未持有锁!!!");
}
}
private byte[] encode(String param) {
return param.getBytes();
}
// static变量,final修饰,加载在内存中,JVM进程生命周期中不变
private static final String APP_ID = UUID.randomUUID().toString();
String releaseLock_lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] \n" +
"then\n" +
" return redis.call(\"del\", KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
String addLockLife_lua = "if redis.call(\"exists\", KEYS[1]) == 1\n" +
"then\n" +
" return redis.call(\"expire\", KEYS[1], ARGV[1])\n" +
"else\n" +
" return 0\n" +
"end";
public void lock() {
// 判断是否可重入
if (isHeldByCurrentThread()) {
return;
}
while (!tryLock()) {
try {
Thread.sleep(50); // 自旋
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("线程:" + threadName + ",占锁成功!★★★");
}
// 是否是当前线程占有锁,同时将超时时间重新设置,这个很重要,同样也是原子操作
private boolean isHeldByCurrentThread() {
List<byte[]> keys = Arrays.asList(encode(lock_key));
List<byte[]> args = Arrays.asList(encode(APP_ID + String.valueOf(threadId)), encode(String.valueOf(1)));
long eval = (Long) jedis.eval(encode(addLockLife_lua), keys, args);
return eval == 1;
}
private boolean tryLock(String uuid) {
SetParams setParams = new SetParams();
setParams.ex(1); // 超时时间1s
setParams.nx(); // nx
String response = jedis.set(lock_key, APP_ID + String.valueOf(threadId), setParams); // 转换为redis命令就是:set haolin-key xxx ex 1 nx
return "OK".equals(response);
}
public void unlock(String uuid) {
List<byte[]> keys = Arrays.asList(encode(lock_key));
List<byte[]> args = Arrays.asList(encode(APP_ID + String.valueOf(threadId)));
// 使用lua脚本,保证原子性
long eval = (Long) jedis.eval(encode(releaseLock_lua), keys, args);
if (eval == 1) {
System.out.println("线程:" + threadName + " 释放锁成功!☆☆☆");
} else {
System.out.println("线程:" + threadName + " 释放锁失败!该线程未持有锁!!!");
}
}
private byte[] encode(String param) {
return param.getBytes();
}
ADD_LOCK_LIFE("if redis.call(\"get\", KEYS[1]) == ARGV[1]\n" + // 判断是否是锁持有者
"then\n" +
" local thisLockMaxTimeKeepKey=KEYS[1] .. \":maxTime\"\n" + // 记录锁最大时间的key是:锁名字:maxTime
" local nowTime=tonumber(ARGV[2])\n" + // 当前传参进来的time
" local maxTime=redis.call(\"incr\", thisLockMaxTimeKeepKey)\n" + // 取出当前锁设置的最大的超时时间,如果这个保持时间的key不存在返回的是字符串nil,这里为了lua脚本的易读性,用incr操作,这样读出来的都是number类型的操作
" local bigerTime=maxTime\n" + // 临时变量bigerTime=maxTime
" if nowTime>maxTime-1\n" + // 如果传参进来的时间>记录的最大时间
" then\n" +
" bigerTime=nowTime\n" + // 则更新bigerTime
" redis.call(\"set\", thisLockMaxTimeKeepKey, tostring(bigerTime))\n" + // 设置超时时间为最大的time,是最安全的
" else \n" +
" redis.call(\"decr\", thisLockMaxTimeKeepKey)\n" + // 当前传参time<maxTime,将刚才那次incr减回来
" end\n" +
" return redis.call(\"expire\", KEYS[1], tostring(bigerTime))\n" + // 重新设置超时时间为当前锁过的最大的time
"else\n" +
" return 0\n" +
"end"),
目前为止,算上上一步中设置最大超时时间的key,加上这一步重入次数的key,加上锁本身的key,已经有3个key,需要注意的事情是,这三个key的超时时间是都要设置的!为什么? 假如说重入次数的 key没有设置超时时间,服务A节点中在一个JVM中重入了5次后,调用一次 RPC服务,RPC服务中同样重入锁,此时,锁重入次数是 6,这个时候A服务宕机,就意味着无论怎样,这把锁不可能释放了,这个分布式锁提供的完整能力,全线不可用了!
public class RedisLockIdleThreadPool {
private String threadAddLife_lua = "if redis.call(\"exists\", KEYS[1]) == 1\n" +
"then\n" +
" return redis.call(\"expire\", KEYS[1], ARGV[1])\n" +
"else\n" +
" return 0\n" +
"end";
private volatile ScheduledExecutorService scheduledThreadPool;
public RedisLockIdleThreadPool() {
if (scheduledThreadPool == null) {
synchronized (this) {
if (scheduledThreadPool == null) {
scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(); // 我这样创建线程池是为了代码的易读性,大家务必使用ThreadPoolExecutor去创建
scheduledThreadPool.scheduleAtFixedRate(() -> {
addLife();
}, 0, 300, TimeUnit.MILLISECONDS);
}
}
}
}
private void addLife() {
// ... 省略jedis的初始化过程
List<byte[]> keys = Arrays.asList(RedisLock.lock_key.getBytes());
List<byte[]> args = Arrays.asList(String.valueOf(1).getBytes());
jedis.eval(threadAddLife_lua.getBytes(), keys, args);
}
}
THREAD_ADD_LIFE("local v=redis.call(\"get\", KEYS[1]) \n" + // get key
"if v==false \n" + // 如果不存在key,读出结果v是false
"then \n" + // 不存在不处理
"else \n" +
" local match = string.find(v, ARGV[1]) \n" + // 存在,判断是否能和APP_ID匹配,匹配不上时match是nil
" if match==\"nil\" \n" +
" then \n" +
" else \n" +
" return redis.call(\"expire\", KEYS[1], ARGV[2]) \n" + // 匹配上了返回的是索引位置,如果匹配上了意味着就是当前进程占有的锁,就延长时间
" end \n" +
"end")
3-13、别急,还有。锁在我手里,我挂了,这...
Redis uses by default asynchronous replication, which being low latency and high performance, is the natural replication mode for the vast majority of Redis use cases.
关注公众号:拾黑(shiheibook)了解更多
[广告]赞助链接:
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/
关注网络尖刀微信公众号
随时掌握互联网精彩
随时掌握互联网精彩
赞助链接
排名
热点
搜索指数
- 1 努力开创法学会事业发展新局面 7910503
- 2 山西临汾地震 7995905
- 3 刘强东母亲夸赞章泽天买的新衣 7818260
- 4 警徽熠熠 擎旗奋进 7719676
- 5 黄子韬送出30辆车 每辆售价超12万 7644890
- 6 女子花4.5万买羽绒服 穿4天就起毛 7591117
- 7 鲁GT8586你真帅 7427190
- 8 女生确诊渐冻症一年容貌巨变 7365578
- 9 留几手回应离婚 7280300
- 10 李行亮:录完节目我们觉得很羞耻 7164943