redis实现分布式延迟队列

主要内容:

1. 使用redis实现分布式延迟队列(redis2.9 版本),用redis锁

2. 使用zookeeper分布式锁优化延迟队列读取

3. 使用延迟队列强制释放过期的zookeeper锁

 

用到的依赖:

spring框架

 

redis: 

redis.clients:jedis:2.9.0

 

zookeeper&curator: 

org.apache.zookeeper:zookeeper:3.4.6

org.apache.curator:curator-framework:2.12.0

org.apache.curator:curator-recipes:2.12.0

 

一. 使用redis实现分布式延迟队列

使用redis的zset功能作为延迟队列,使用时间戳unix timestamp作为score,取的时候取超过当前时间的score。

redis为2.9版本,我翻破了参考文档也没有找到一个可以原子性删除+取回zset某范围内一定数量的值的方法(头很痛),所以只能用锁来控制这个zset的访问,如果有什么好办法请告诉我。

因为可能有很多值在同一时间被取出,如果一次取出zset某时间段内所有值会造成单台机器压力过大,需要限制每次取出的最大量。

下面是一个延迟队列的例子,时间单位全部是毫秒。

import com.study.javaweb.test1.utils.JsonUtils;
import com.study.javaweb.test1.utils.StringUtils;
import com.study.javaweb.test1.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Tuple;import java.util.HashMap;
import java.util.Map;
import java.util.Set;/*** @Author: longjuanfeng* @Date: 8/13/18* @Description:*/
@Component
@Slf4j
public class RedisDelayQueue {@Autowiredprivate Jedis jedis;private final String delayQueuePrefix = "d_q_p:";private final String delayQueueLock = "d_q_l:";public void push(String name, Map dataList) {if (CollectionUtils.isEmpty(dataList)) {return;}String key = delayQueuePrefix + name;Pipeline pipeline = jedis.pipelined();dataList.forEach((data, delay) -> {if (data == null || delay == null) {return;}//JsonUtils把类转化为json string存储在redispipeline.zadd(key, System.currentTimeMillis() + delay, JsonUtils.toJson(data));});pipeline.sync();}public  Map poll(String name, Long timeout, Integer count, Class clazz) {String queueLock = delayQueueLock + name;Long waitTime = 0L;while (waitTime < timeout) {String result = jedis.set(queueLock, "1", "NX", "EX", 2000);if (StringUtils.isEmpty(result)) {TimeUtils.sleep(50L);waitTime += 50;} else {break;}}if (waitTime >= timeout) {return null;}try {String key = delayQueuePrefix + name;Set uidTimeSet = jedis.zrangeByScoreWithScores(key, 0, System.currentTimeMillis(), 0, count);Map dataList = new HashMap<>();if (!CollectionUtils.isEmpty(uidTimeSet)) {jedis.zrem(key, uidTimeSet.stream().map(Tuple::getElement).toArray(String[]::new));//把string还原为类结构uidTimeSet.forEach(tuple ->dataList.put(JsonUtils.fromJson(tuple.getElement(), clazz), new Double(tuple.getScore()).longValue()));}return dataList;} finally {jedis.del(queueLock);}}
}

 

可以通过取出的值的score判断现在队列的延迟情况,手动抛错,防止因消费速度不足而引起的队列堆积

 

二. 使用zookeeper分布式锁优化延迟队列读取

上例使用了redis分布锁来控制redis zset的访问。

使用redis锁的问题是:

1. zset空闲时间长,消费速度慢,可能造成数据堆积

2. 机器数量多的情况下,大量获取锁的请求增加了redis的压力

3. 负载不均衡,一些机器可能连续抢到锁

可以使用zookeeper锁优化

先封装一个curator锁工具类。支持单线程、多线程、分布式加锁

/*** @Author: longjuanfeng* @Date: 8/27/18* @Description:*/
@Component
@Slf4j
public class DistributedLock {private final Map lockMap = new ConcurrentHashMap<>();private final String rootPath = "longjuanfeng_test1";private CuratorFramework client;private volatile Boolean needShutDown = false;@PostConstructpublic void initialize() {this.client = CuratorFrameworkFactory.builder().connectString("localhost:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutMs(2000).sessionTimeoutMs(2000).namespace(rootPath).build();this.client.start();}public Boolean lockOnce(String key) {Boolean result = lockOnce(key, 1000L);log.info("lockOnce key : {}, result : {}", key, result);return result;}public Boolean lockOnce(String key, Long waitTime) {if (waitTime < 0 || StringUtils.isEmpty(key)) {throw new IllegalArgumentException();}Long hasWait = 0L;InterProcessMutex lock = new InterProcessMutex(this.client, "/" + key);while (lockMap.putIfAbsent(key, lock) != null) {if (hasWait > waitTime) {return false;}TimeUtils.sleep(20L);hasWait += 20;}Boolean result = false;try {Long remainTime = waitTime - hasWait < 1 ? 1 : waitTime - hasWait;result = lock.acquire(remainTime, TimeUnit.MILLISECONDS);if (result) {;} else {lockMap.remove(key);}return result;} catch (Exception e) {if (!result) {lockMap.remove(key);}return false;}}public void unlock(String key) {InterProcessMutex lock = lockMap.get(key);try {if (lock != null) {lock.release();lockMap.remove(key, lock);log.info("unlock key : {}", key);}} catch (Exception e) {log.error("lock expired. key : {}, thread : {}", key, Thread.currentThread().getName());}}@PreDestroypublic void destroy() {needShutDown = true;if (!CollectionUtils.isEmpty(this.lockMap)) {this.lockMap.forEach((key, lock) -> {try {log.info("try unlock {}", key);lock.release();} catch (Exception e) {log.error("", e);}});}this.client.close();}
}

用zookeeper锁zset:

@Component
@Slf4j
public class RedisDelayQueue {@Autowiredprivate Jedis jedis;@Autowiredprivate DistributedLock lock;private final String delayQueuePrefix = "d_q_p:";private final String delayQueueLock = "d_q_l:";public void push(String name, Map dataList) {if (CollectionUtils.isEmpty(dataList)) {return;}String key = delayQueuePrefix + name;Pipeline pipeline = jedis.pipelined();dataList.forEach((data, delay) -> {if (data == null || delay == null) {return;}//JsonUtils把类转化为json string存储在redispipeline.zadd(key, System.currentTimeMillis() + delay, JsonUtils.toJson(data));});pipeline.sync();}public  Map poll(String name, Long timeout, Integer count, Class clazz) {String queueLock = delayQueueLock + name;if (!lock.lockOnce(queueLock, timeout)) {return null;}try {String key = delayQueuePrefix + name;Set uidTimeSet = jedis.zrangeByScoreWithScores(key, 0, System.currentTimeMillis(), 0, count);Map dataList = new HashMap<>();if (!CollectionUtils.isEmpty(uidTimeSet)) {jedis.zrem(key, uidTimeSet.stream().map(Tuple::getElement).toArray(String[]::new));//把string还原为类结构uidTimeSet.forEach(tuple ->dataList.put(JsonUtils.fromJson(tuple.getElement(), clazz), new Double(tuple.getScore()).longValue()));}return dataList;} finally {lock.unlock(queueLock);}}
}

 

三. 使用延迟队列强制释放过期的zookeeper锁

因为使用了一个bean连接zookeeper,所以如果锁了某个队列之后不调用unlock的话,这个锁在程序退出之前都是不会解锁的,这在实际操作上有一定风险,某些情况下,如果忘记unlock或者某些步骤导致线程卡死,可能造成资源锁死。所以每次上锁操作的时候需要预估程序执行时间,设置锁过期

使用一个本地延迟队列存放使用过的锁和过期时间,跑一个循环任务清理过期的锁。如果程序意外退出zookeeper会自动将这个程序申请的锁全部解锁。

增加了强制过期的锁工具类:

@Component
@Slf4j
public class DistributedLock {private final Map lockMap = new ConcurrentHashMap<>();private final String rootPath = "longjuanfeng_test1";private CuratorFramework client;private volatile Boolean needShutDown = false;private final DelayQueue locksDelayQueue = new DelayQueue<>();@Getterpublic static class DelayLock implements Delayed {private String key;private InterProcessMutex lock;private Long expireTime;public DelayLock(String key, InterProcessMutex lock, Long lockTime) {this.key = key;this.lock = lock;this.expireTime = System.currentTimeMillis() + lockTime;}@Overridepublic long getDelay(TimeUnit unit) {Long delay = expireTime - System.currentTimeMillis();return unit.convert(delay, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed cmp) {return Long.valueOf(this.getDelay(TimeUnit.MILLISECONDS) - cmp.getDelay(TimeUnit.MILLISECONDS)).intValue();}}@PostConstructpublic void initialize() {this.client = CuratorFrameworkFactory.builder().connectString("localhost:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutMs(2000).sessionTimeoutMs(2000).namespace(rootPath).build();this.client.start();}public Boolean lockOnce(String key) {Boolean result = lockOnce(key, 1000L, 2000L);log.info("lockOnce key : {}, result : {}", key, result);return result;}public Boolean lockOnce(String key, Long waitTime, Long lockTime) {if (waitTime < 0 || StringUtils.isEmpty(key)) {throw new IllegalArgumentException();}Long hasWait = 0L;InterProcessMutex lock = new InterProcessMutex(this.client, "/" + key);while (lockMap.putIfAbsent(key, lock) != null) {if (hasWait > waitTime) {return false;}TimeUtils.sleep(20L);hasWait += 20;}Boolean result = false;try {Long remainTime = waitTime - hasWait < 1 ? 1 : waitTime - hasWait;result = lock.acquire(remainTime, TimeUnit.MILLISECONDS);if (result) {locksDelayQueue.add(new DelayLock(key, lock, lockTime));} else {lockMap.remove(key);}return result;} catch (Exception e) {if (!result) {lockMap.remove(key);}return false;}}public void unlock(String key) {InterProcessMutex lock = lockMap.get(key);try {if (lock != null) {lock.release();lockMap.remove(key, lock);log.info("unlock key : {}", key);}} catch (Exception e) {log.error("lock expired. key : {}, thread : {}", key, Thread.currentThread().getName());}}@PreDestroypublic void destroy() {needShutDown = true;if (!CollectionUtils.isEmpty(this.lockMap)) {this.lockMap.forEach((key, lock) -> {try {log.info("try unlock {}", key);lock.release();} catch (Exception e) {log.error("", e);}});}this.client.close();}@Scheduled(initialDelay = 2000L, fixedDelay = Long.MAX_VALUE)public void releaseTimeoutLock() {while (!needShutDown) {try {DelayLock delayLock = locksDelayQueue.poll();if (delayLock == null) {TimeUtils.sleep(100L);continue;}InterProcessMutex lock = lockMap.get(delayLock.getKey());if (lock == delayLock.getLock()) {log.info("force release lock {}", delayLock.getKey());forceReleaseLock(lock);lockMap.remove(delayLock.getKey(), delayLock.getLock());}} catch (Exception e) {log.error("releaseTimeoutLock error :", e);}}}private void forceReleaseLock(InterProcessMutex lock) {try {Class classz = InterProcessMutex.class;Field field = classz.getDeclaredField("threadData");field.setAccessible(true);ConcurrentMap map = (ConcurrentMap) field.get(lock);map.put(Thread.currentThread(), map.entrySet().stream().map(entry -> entry.getValue()).findFirst().get());lock.release();} catch (Exception e) {log.error("forceReleaseLock error", e);}}
}

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部