DelayQueue
DelayQueue 是 Java 并发包(java.util.concurrent)中的一个阻塞队列实现,用于存放实现了 Delayed 接口的对象。队列中的元素只有在其延迟期满后才能被取出。
DelayQueue 的特点
- 无界阻塞队列:DelayQueue 是一个无界的阻塞队列。
- 基于延迟时间排序:队列内部的元素必须实现 Delayed 接口,并根据其剩余延迟时间进行排序。
- 先进先出原则(FIFO)但受限于延迟时间:即使某个任务排在前面,如果它的延迟时间还没到,也不能被取出。
- 线程安全:适用于多线程环境下的调度场景。
使用场景
- 定时任务调度(如定时清理缓存)
- 消息队列中需要延迟消费的消息
- 限流器、令牌桶算法实现
- 延迟执行的操作(如重试机制)
核心接口和类
1. Delayed 接口
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
- getDelay() 方法返回当前元素还需要延迟多久(单位由参数决定)。
- 队列会按照这个剩余时间排序。
示例代码
下面是一个使用 DelayQueue 实现延迟消息处理的例子:
Step 1: 定义一个实现 Delayed 的类
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayedMessage implements Delayed {
private final String message;
private final long startTime; // 触发时间点(毫秒)
public DelayedMessage(String message, long delayInMilliseconds) {
this.message = message;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.startTime < ((DelayedMessage) o).startTime) {
return -1;
}
if (this.startTime > ((DelayedMessage) o).startTime) {
return 1;
}
return 0;
}
@Override
public String toString() {
return "DelayedMessage{" +
"message='" + message + '\'' +
", startTime=" + startTime +
'}';
}
public String getMessage() {
return message;
}
}
Step 2: 使用 DelayQueue 添加和消费延迟消息
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<DelayedMessage> queue = new DelayQueue<>();
// 生产者线程
Thread producer = new Thread(() -> {
try {
System.out.println("Putting messages into the queue...");
queue.put(new DelayedMessage("Message 1", 5000)); // 5秒后可用
queue.put(new DelayedMessage("Message 2", 10000)); // 10秒后可用
queue.put(new DelayedMessage("Message 3", 2000)); // 2秒后可用
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
while (!Thread.interrupted()) {
DelayedMessage msg = queue.take(); // 阻塞直到可以取出
System.out.println("Consumed: " + msg.getMessage() + " at " + System.currentTimeMillis());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
// 主线程等待一段时间后中断消费者
Thread.sleep(15000);
consumer.interrupt();
}
}
执行结果
Putting messages into the queue...
Consumed: Message 3 at 1746810244996
Consumed: Message 1 at 1746810247994
Consumed: Message 2 at 1746810253001
注意事项
- DelayQueue 内部使用了优先队列(最小堆),性能上对于频繁插入和取出操作是高效的。
- 只有当 getDelay() 返回值 ≤ 0 时,take() 才能取到该元素。
- 如果多个元素同时到期,它们的顺序取决于 compareTo() 的实现。
- 不支持 null 元素。
总结
特性 | 描述 |
类型 | 无界、阻塞、延迟队列 |
线程安全 | 是 |
排序依据 | getDelay() 和 compareTo() |
底层结构 | 最小堆(优先队列) |
典型用途 | 定时任务、缓存过期、事件延迟处理 |