Redisson 分布式锁原理
Redisson如何解决重入问题
可重入锁:同一线程可以获取同一把锁
实现:在获取锁的时候,如果存在,判断锁标识是不是同一个线程,如果是,也可以获取锁,并使用计数器记录重入的次数

lua 脚本获取锁trylock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; # 锁存在,但不是自己的(获取锁失败),返回锁剩余有效时间(单位毫秒) return redis.call('pttl', KEYS[1]);
|
lua 脚本释放锁 unlock
释放锁也涉及如何解决锁的重试问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
|
Redisson如何解决锁重试问题
首先,学习tryLock()方法的参数

redisson中tryLock()参数解释
- 第一个参数 waitTime 指定了这个锁,那么锁就是可重试锁,超过waitTime,才返回获取锁失败
- 第二个参数 leaseTime 锁超时释放时间 不指定 30s
- 第三个参数 单位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { return true; } else { time -= System.currentTimeMillis() - current; if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); return false; } else { current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { this.unsubscribe(subscribeFuture, threadId); }
}); } this.acquireFailed(waitTime, unit, threadId); return false; } else { try { time -= System.currentTimeMillis() - current; if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); boolean var20 = false; return var20; } else { boolean var16; do { long currentTime = System.currentTimeMillis(); ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { var16 = true; return var16; } time -= System.currentTimeMillis() - currentTime; if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; }
currentTime = System.currentTimeMillis(); if (ttl >= 0L && ttl < time) { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); }
time -= System.currentTimeMillis() - currentTime; } while(time > 0L);
this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; } } finally { this.unsubscribe(subscribeFuture, threadId); } } } } }
|
小结:使用消息订阅和信号量机制(等你释放锁的时候,发送消息告诉我一声,我再去尝试),而不是无休止的重试(增加CPU负担)
Redisson如何解决超时释放
【这个其实,还是有点疑惑】
如果一个线程因为阻塞导致ttl到期释放了锁,然后其他线程获取到了锁
所以,Redisson是如何保证获取锁是因为别的线程执行完业务释放锁,而不是其他异常时获取的锁 –> 不理解
我觉得看门狗机制解决的问题
如果锁的超时释放时间设置为-1,没有设置看门狗机制,该线程阻塞了,一直不释放锁,那么其他线程就获取不到这个锁。有了看门狗机制,10s内没有续约超时释放时间,那么锁就自动释放了,其他线程就可以获得锁了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining == null) { this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private void scheduleExpirationRenewal(long threadId) { RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry(); RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); this.renewExpiration(); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| private void renewExpiration() { RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent != null) { Long threadId = ent.getFirstThreadId(); if (threadId != null) { RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e); } else { if (res) { RedissonLock.this.renewExpiration(); }
} }); } } } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task); } }
|
internalLockLeaseTime

1 2 3 4
| protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId)); }
|
然后,什么时候取消这个延时任务 【锁释放的时候取消】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise(); RFuture<Boolean> future = this.unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { this.cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); } else if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId); result.tryFailure(cause); } else { result.trySuccess((Object)null); } }); return result; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void cancelExpirationRenewal(Long threadId) { RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (task != null) { if (threadId != null) { task.removeThreadId(threadId); }
if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); }
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName()); }
} }
|
Redisson如何解决主从一致性问题
主从一致性产生原因:单节点redis,如果该redis出现问题,所有依赖于redis的都会出现问题(包括分布式锁),这时候需要使用redis集群或主从,提高redis的可用性
主节点负责redis写操作,从节点负责redis读操作,主从节点需要数据同步。
如何使用redis搭建集群还不会