使用DelayQueue 和 FutureTask 实现java中的缓存

使用DelayQueue、ConcurrentHashMap、FutureTask实现的缓存工具类。

DelayQueue 简介

DelayQueue是一个支持延时获取元素的无界阻塞队列。DelayQueue内部队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。

  1. 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询
    DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  2. 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从
    DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

ConcurrentHashMap和FutureTask,详见以下:

  1. http://www.jianshu.com/p/d10256f0ebea
  2. FutureTask 源码分析

缓存工具类实现

  1. 支持缓存多长时间,单位毫秒。
  2. 支持多线程并发。
    比如:有一个比较耗时的操作,此时缓冲中没有此缓存值,一个线程开始计算这个耗时操作,而再次进来线程就不需要再次进行计算,只需要等上一个线程计算完成后(使用FutureTask)返回该值即可。
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;public class CacheBean<V> {// 缓存计算的结果private final static ConcurrentMap> cache = new ConcurrentHashMap<>();// 延迟队列来判断那些缓存过期private final static DelayQueue> delayQueue = new DelayQueue<>();// 缓存时间private final int ms;static {// 定时清理过期缓存Thread t = new Thread() {@Overridepublic void run() {dameonCheckOverdueKey();}};t.setDaemon(true);t.start();}private final Computable c;/*** @param c Computable*/public CacheBean(Computable c) {this(c, 60 * 1000);}/*** @param c Computable* @param ms 缓存多少毫秒*/public CacheBean(Computable c, int ms) {this.c = c;this.ms = ms;}public V compute(final String key) throws InterruptedException {while (true) {//根据key从缓存中获取值Future f = (Future) cache.get(key);if (f == null) {Callable eval = new Callable() {public V call() {return (V) c.compute(key);}};FutureTask ft = new FutureTask<>(eval);//如果缓存中存在此可以,则返回已存在的valuef = (Future) cache.putIfAbsent(key, (Future) ft);if (f == null) {//向delayQueue中添加key,并设置该key的存活时间delayQueue.put(new DelayedItem<>(key, ms));f = ft;ft.run();}}try {return f.get();} catch (CancellationException e) {cache.remove(key, f);} catch (ExecutionException e) {e.printStackTrace();}}}/*** 检查过期的key,从cache中删除*/private static void dameonCheckOverdueKey() {DelayedItem delayedItem;while (true) {try {delayedItem = delayQueue.take();if (delayedItem != null) {cache.remove(delayedItem.getT());System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache");}} catch (InterruptedException e) {e.printStackTrace();}}}}class DelayedItem implements Delayed {private T t;private long liveTime;private long removeTime;public DelayedItem(T t, long liveTime) {this.setT(t);this.liveTime = liveTime;this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis();}@Overridepublic int compareTo(Delayed o) {if (o == null)return 1;if (o == this)return 0;if (o instanceof DelayedItem) {DelayedItem tmpDelayedItem = (DelayedItem) o;if (liveTime > tmpDelayedItem.liveTime) {return 1;} else if (liveTime == tmpDelayedItem.liveTime) {return 0;} else {return -1;}}long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);return diff > 0 ? 1 : diff == 0 ? 0 : -1;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(removeTime - System.currentTimeMillis(), unit);}public T getT() {return t;}public void setT(T t) {this.t = t;}@Overridepublic int hashCode() {return t.hashCode();}@Overridepublic boolean equals(Object object) {if (object instanceof DelayedItem) {return object.hashCode() == hashCode() ? true : false;}return false;}} 

Computable 接口

public interface Computable<V> {V compute(String k);}

测试类

public class FutureTaskDemo {public static void main(String[] args) throws InterruptedException {// 子线程Thread t = new Thread(() -> {CacheBean cb = new CacheBean<>(k -> {try {System.out.println("模拟计算数据,计算时长2秒。key=" + k);TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "你好:" + k;}, 5000);try {while (true) {System.out.println("thead2:" + cb.compute("b"));TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();}});t.start();// 主线程while (true) {CacheBean cb = new CacheBean<>(k -> {try {System.out.println("模拟计算数据,计算时长2秒。key=" + k);TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "你好:" + k;}, 5000);System.out.println("thead1:" + cb.compute("b"));TimeUnit.SECONDS.sleep(1);}}
}

执行结果:
执行结果

两个线程同时访问同一个key的缓存。从执行结果发现,每次缓存失效后,同一个key只执行一次计算,而不是多个线程并发执行同一个计算然后缓存。

本人简书blog地址:http://www.jianshu.com/u/1f0067e24ff8    
点击这里快速进入简书

GIT地址:http://git.oschina.net/brucekankan/
点击这里快速进入GIT


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章