Redisson是一个基于Redis的Java驻留库,旨在简化Java应用程序对Redis的操作。它提供了丰富的功能和易于使用的API,使得在Java应用中集成Redis变得更加简单和高效。Redisson的主要功能包括分布式对象、分布式锁、分布式队列、分布式调度器等,使得在分布式环境中进行协作变得更加容易。
延迟队列是Redisson提供的一个功能强大的组件之一。它允许开发者在指定的延迟时间之后自动执行任务,这对于需要在一定时间后执行操作的应用程序非常有用,如消息通知、订单处理、定时任务等。
Redisson实现延迟队列的原理基于Redis的有序集合(Sorted Set)和Redis的过期时间机制
生产者
因为RDelayedQueue
并不直接存储任务。相反,它是一种装饰器或包装器,用于添加延迟功能到一个标准的Redisson队列(如RBlockingQueue
)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Slf4j @Component public class RedissonDelayQueueProducer { @Autowired private RedissonClient redissonClient;
public void addDelayedTask(String topic, Object task, long delay, TimeUnit timeUnit) { log.info("redis的延迟队列 {} (添加)的key:{},time:{},当前时间:{}", topic, task, delay, LocalDateTimeUtil.formatNormal(LocalDateTime.now())); redissonClient.getDelayedQueue(redissonClient.getBlockingQueue(topic)).offerAsync(task, delay, timeUnit); }
public Boolean cancelDelayedTask(String topic, Object task) { log.info("redis的延迟队列{} (移除)的key:{},当前时间:{}", topic, task, LocalDateTimeUtil.formatNormal(LocalDateTime.now())); return redissonClient.getDelayedQueue(redissonClient.getBlockingQueue(topic)).remove(task); } }
|
消费者
起一个线程循环监听延迟队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Slf4j @Service public class RedissonDelayedQueueConsumer { private final static String topic = "topic"; @Autowired private RedissonClient redissonClient;
@PostConstruct public void init() { log.info("{} 开始监听", topic); new Thread(() -> { while (true) { try { RBlockingQueue<Object> aaa = redissonClient.getBlockingQueue(topic); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(aaa); Object take = aaa.take(); log.info("redis的延迟队列:{},当前时间:{}", take, LocalDateTimeUtil.formatNormal(LocalDateTime.now())); } catch (Exception e) { log.error("redis的延迟队列抛出异常", e); } } }).start(); } }
|
效果展示
redisson配置
1 2 3 4 5 6 7 8 9
| <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.17.4</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Configuration public class RedissonConfig {
@Value("${spring.data.redis.host}") private String redisHost; @Value("${spring.data.redis.port}") private String redisPort; @Value("${spring.data.redis.password}") private String password;
@Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort); if(StringUtils.isBlank(password)) { config.useSingleServer().setPassword(null); } else { config.useSingleServer().setPassword(password); } return Redisson.create(config); } }
|