Springboot-Redisson实现延迟消息队列

Redisson是一个基于Redis的Java驻留库,旨在简化Java应用程序对Redis的操作。它提供了丰富的功能和易于使用的API,使得在Java应用中集成Redis变得更加简单和高效。Redisson的主要功能包括分布式对象、分布式锁、分布式队列、分布式调度器等,使得在分布式环境中进行协作变得更加容易。

延迟队列是Redisson提供的一个功能强大的组件之一。它允许开发者在指定的延迟时间之后自动执行任务,这对于需要在一定时间后执行操作的应用程序非常有用,如消息通知、订单处理、定时任务等。

Redisson实现延迟队列的原理基于Redis的有序集合(Sorted Set)和Redis的过期时间机制

生产者

因为RDelayedQueue并不直接存储任务。相反,它是一种装饰器或包装器,用于添加延迟功能到一个标准的Redisson队列(如RBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
@Component
public class RedissonDelayQueueProducer {
@Autowired
private RedissonClient redissonClient;

public void addDelayedTask(String topic, Object task, long delay, TimeUnit timeUnit) {
log.info("redis的延迟队列 {} (添加)的key:{},time:{},当前时间:{}", topic, task, delay, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
redissonClient.getDelayedQueue(redissonClient.getBlockingQueue(topic)).offerAsync(task, delay, timeUnit);
}

public Boolean cancelDelayedTask(String topic, Object task) {
log.info("redis的延迟队列{} (移除)的key:{},当前时间:{}", topic, task, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
// 从延迟队列中移除任务
return redissonClient.getDelayedQueue(redissonClient.getBlockingQueue(topic)).remove(task);
}
}

消费者

起一个线程循环监听延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
@Service
public class RedissonDelayedQueueConsumer {
private final static String topic = "topic";
@Autowired
private RedissonClient redissonClient;

@PostConstruct
public void init() {
// 创建延迟队列
log.info("{} 开始监听", topic);
new Thread(() -> {
while (true) {
try {
RBlockingQueue<Object> aaa = redissonClient.getBlockingQueue(topic);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(aaa);
// 从延迟队列中取出任务
Object take = aaa.take();
// 处理延迟任务,例如执行某个操作
log.info("redis的延迟队列:{},当前时间:{}", take, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
} catch (Exception e) {
log.error("redis的延迟队列抛出异常", e);
}
}
}).start();
}
}

效果展示

image-20240429115537300

redisson配置

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class RedissonConfig {

@Value("${spring.data.redis.host}")
private String redisHost;
@Value("${spring.data.redis.port}")
private String redisPort;
@Value("${spring.data.redis.password}")
private String password;

@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort);
if(StringUtils.isBlank(password)) {
config.useSingleServer().setPassword(null);
} else {
config.useSingleServer().setPassword(password);
}
return Redisson.create(config);
}
}

Springboot-Redisson实现延迟消息队列
https://cason.work/2024/04/29/Springboot-Redisson实现延迟消息队列/
作者
Cason Mo
发布于
2024年4月29日
许可协议