肥仔教程网

SEO 优化与 Web 开发技术学习分享平台

Java并发工具:DelayQueue

DelayQueue

DelayQueue 是 Java 并发包(java.util.concurrent)中的一个阻塞队列实现,用于存放实现了 Delayed 接口的对象。队列中的元素只有在其延迟期满后才能被取出。

DelayQueue 的特点

  1. 无界阻塞队列:DelayQueue 是一个无界的阻塞队列。
  2. 基于延迟时间排序:队列内部的元素必须实现 Delayed 接口,并根据其剩余延迟时间进行排序。
  3. 先进先出原则(FIFO)但受限于延迟时间:即使某个任务排在前面,如果它的延迟时间还没到,也不能被取出。
  4. 线程安全:适用于多线程环境下的调度场景。

使用场景

  • 定时任务调度(如定时清理缓存)
  • 消息队列中需要延迟消费的消息
  • 限流器、令牌桶算法实现
  • 延迟执行的操作(如重试机制)

核心接口和类

1. Delayed 接口

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
  • getDelay() 方法返回当前元素还需要延迟多久(单位由参数决定)。
  • 队列会按照这个剩余时间排序。

示例代码

下面是一个使用 DelayQueue 实现延迟消息处理的例子:

Step 1: 定义一个实现 Delayed 的类

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayedMessage implements Delayed {
    private final String message;
    private final long startTime; // 触发时间点(毫秒)

    public DelayedMessage(String message, long delayInMilliseconds) {
        this.message = message;
        this.startTime = System.currentTimeMillis() + delayInMilliseconds;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.startTime < ((DelayedMessage) o).startTime) {
            return -1;
        }
        if (this.startTime > ((DelayedMessage) o).startTime) {
            return 1;
        }
        return 0;
    }

    @Override
    public String toString() {
        return "DelayedMessage{" +
                "message='" + message + '\'' +
                ", startTime=" + startTime +
                '}';
    }

    public String getMessage() {
        return message;
    }
}

Step 2: 使用 DelayQueue 添加和消费延迟消息

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

public class DelayQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<DelayedMessage> queue = new DelayQueue<>();

        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                System.out.println("Putting messages into the queue...");
                queue.put(new DelayedMessage("Message 1", 5000)); // 5秒后可用
                queue.put(new DelayedMessage("Message 2", 10000)); // 10秒后可用
                queue.put(new DelayedMessage("Message 3", 2000)); // 2秒后可用
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                while (!Thread.interrupted()) {
                    DelayedMessage msg = queue.take(); // 阻塞直到可以取出
                    System.out.println("Consumed: " + msg.getMessage() + " at " + System.currentTimeMillis());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();

        // 主线程等待一段时间后中断消费者
        Thread.sleep(15000);
        consumer.interrupt();
    }
}

执行结果

Putting messages into the queue...
Consumed: Message 3 at 1746810244996
Consumed: Message 1 at 1746810247994
Consumed: Message 2 at 1746810253001

注意事项

  • DelayQueue 内部使用了优先队列(最小堆),性能上对于频繁插入和取出操作是高效的。
  • 只有当 getDelay() 返回值 ≤ 0 时,take() 才能取到该元素。
  • 如果多个元素同时到期,它们的顺序取决于 compareTo() 的实现。
  • 不支持 null 元素。

总结

特性

描述

类型

无界、阻塞、延迟队列

线程安全

排序依据

getDelay() 和 compareTo()

底层结构

最小堆(优先队列)

典型用途

定时任务、缓存过期、事件延迟处理

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言