如何在Springboot中使用Redis5的Stream
关于Stream
一句话概括:Redis5的新数据类型,功能就是MQ。可以生产消息,消费消息。支持群组消费,以及消息确认。
在理解了Stream
后,就可以继续往下看
SpringBoot整合
只需要整合进Redis就行。
POM.xml
springboot2默认使用lettuce
作为客户端
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
配置
spring:
redis:
database: 0
host: 192.168.1.103
port: 6379
password: "123456"
timeout: 2000
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
消息 和 消息ID的对象
我觉得要先说一下,这两个对象。因为以下的内容,都需要跟这两个对象打交道
消息对象的创建
使用 StreamRecords
的静态方法来创建消息实例。
一个stream消息有两个内容。可以理解为:一个是key,一个是value。
key和value都可以使用自定义的对象,字节,字符串来定义
ByteRecord rawBytes(Map<byte[], byte[]> raw)
ByteBufferRecord rawBuffer(Map<ByteBuffer, ByteBuffer> raw)
StringRecord string(Map<String, String> raw)
<S, K, V> MapRecord<S, K, V> mapBacked(Map<K, V> map)
<S, V> ObjectRecord<S, V> objectBacked(V value)
RecordBuilder<?> newRecord() // 通过builder方式来创建消息
RecordId 表示消息ID
你读过上面的帖子,就会知道。一条消息的ID是唯一的。并且有2部分组成
// ----------- 读取ID属性的实例方法
// 是否是系统自动生成的
boolean shouldBeAutoGenerated();
// 获取原始的id字符串
String getValue();
// 获取序列号部分
long getSequence();
// 获取时间戳部分
long getTimestamp();
// ----------- 创建ID的静态方法
RecordId of(@Nullable String value)
RecordId of(long millisecondsTime, long sequenceNumber)
RecordId autoGenerate()
往Stream推送消息
使用RedisTemplate
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void test () {
// 创建消息记录, 以及指定stream
StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("name", "KevinBlandy")).withStreamKey("mystream");
RecordId recordId = this.stringRedisTemplate.opsForStream().add(stringRecord);
// 是否是自动生成的
boolean autoGenerated = recordId.shouldBeAutoGenerated();
// id值
String value = recordId.getValue();
// 序列号部分
long sequence = recordId.getSequence();
// 时间戳部分
long timestamp = recordId.getTimestamp();
}
使用RedisConnection
@Autowired
private RedisConnectionFactory redisConnectionFactory;
public void test () {
// 创建消息记录, 以及指定stream
ByteRecord byteRecord = StreamRecords.rawBytes(Collections.singletonMap("name".getBytes(), "KevinBlandy".getBytes())).withStreamKey("mystream".getBytes());
// 获取连接
RedisConnection redisConnection = this.redisConnectionFactory.getConnection();
RecordId recordId = redisConnection.xAdd(byteRecord);
// 是否是自动生成的
boolean autoGenerated = recordId.shouldBeAutoGenerated();
// id值
String value = recordId.getValue();
// 序列号部分
long sequence = recordId.getSequence();
// 时间戳部分
long timestamp = recordId.getTimestamp();
}
从Stream消费消息
阻塞消费
StreamConsumerRunner
使用 ApplicationRnner
,在系统启动以后,初始化监听器。开始监听消费。
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;
@Component
public class StreamConsumerRunner implements ApplicationRunner, DisposableBean {
static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class);
@Value("${redis.stream.consumer}")
private String consumer;
@Autowired
RedisConnectionFactory redisConnectionFactory;
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
StreamMessageListener streamMessageListener;
@Autowired
StringRedisTemplate stringRedisTemplate;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;
@Override
public void run(ApplicationArguments args) throws Exception {
// 创建配置对象
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(10)
// 执行消息轮询的执行器
.executor(this.threadPoolTaskExecutor)
// 消息消费异常的handler
.errorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
// throw new RuntimeException(t);
t.printStackTrace();
}
})
// 超时时间,设置为0,表示不超时(超时后会抛出异常)
.pollTimeout(Duration.ZERO)
// 序列化器
.serializer(new StringRedisSerializer())
.build();
// 根据配置对象创建监听容器对象
StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
.create(this.redisConnectionFactory, streamMessageListenerContainerOptions);
// 使用监听容器对象开始监听消费(使用的是手动确认方式)
streamMessageListenerContainer.receive(Consumer.from("group-1", "consumer-1"),
StreamOffset.create("mystream", ReadOffset.lastConsumed()), this.streamMessageListener);
this.streamMessageListenerContainer = streamMessageListenerContainer;
// 启动监听
this.streamMessageListenerContainer.start();
}
@Override
public void destroy() throws Exception {
this.streamMessageListenerContainer.stop();
}
}
StreamMessageListener
实现函数接口 StreamListener<K, V extends Record<K, ?>>
,来自定义消息的消费实现
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
@Component
public class StreamMessageListener implements StreamListener<String, MapRecord<String, String, String>>{
static final Logger LOGGER = LoggerFactory.getLogger(StreamMessageListener.class);
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
// 消息ID
RecordId messageId = message.getId();
// 消息的key和value
Map<String, String> body = message.getValue();
LOGGER.info("stream message。messageId={}, stream={}, body={}", messageId, message.getStream(), body);
// 通过RedisTemplate手动确认消息
this.stringRedisTemplate.opsForStream().acknowledge("mystream", message);
}
}
非阻塞消费
主要是通过StreamOperations
或者是 RedicConnection
的消费API来进行消息的随机消费
StreamOperations 中,关于读取操作的API
从RedisTemplate中获取到StreamOperations
StreamOperations<String, String, String> s = this.stringRedisTemplate.opsForStream();
StreamOperations 的读取 API
// 随机范围读取
<V> List<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<String> range)
<V> List<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<String> range, Limit limit)
// 根据消息ID或者偏移量读取
List<MapRecord<K, HK, HV>> read(StreamOffset<K>... streams)
<V> List<ObjectRecord<K, V>> read(Class<V> targetType, StreamOffset<K>... streams)
List<MapRecord<K, HK, HV>> read(StreamReadOptions readOptions, StreamOffset<K>... streams)
<V> List<ObjectRecord<K, V>> read(Class<V> targetType, StreamReadOptions readOptions, StreamOffset<K>... streams)
List<MapRecord<K, HK, HV>> read(Consumer consumer, StreamOffset<K>... streams)
<V> List<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer, StreamOffset<K>... streams)
List<MapRecord<K, HK, HV>> read(Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams)
List<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams)
// 随机逆向范围读取
List<MapRecord<K, HK, HV>> reverseRange(K key, Range<String> range)
List<MapRecord<K, HK, HV>> reverseRange(K key, Range<String> range, Limit limit)
<V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Range<String> range)
<V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Range<String> range, Limit limit)
// 消费者信息
XInfoConsumers consumers(K key, String group);
// 消费者信息
XInfoGroups groups(K key);
// stream信息
XInfoStream info(K key);
// 获取消费组,消费者中未确认的消息
PendingMessagesSummary pending(K key, String group);
PendingMessages pending(K key, Consumer consumer)
PendingMessages pending(K key, String group, Range<?> range, long count)
PendingMessages pending(K key, String group, Range<?> range, long count)
测试
先通过Redis控制台创建stream以及group。
127.0.0.1:6379> XADD mystream * hello world
"1583208428680-0"
127.0.0.1:6379> XGROUP CREATE mystream group-1 $
OK
启动程序后,通过控制台往stream生产消息
127.0.0.1:6379> XADD mystream * name KevinBlandy
"1583208571017-0"
程序成功的消费了这条消息
2020-03-03 12:09:34.159 INFO 9344 --- [lTaskExecutor-1] i.s.c.r.stream.StreamMessageListener : stream message。messageId=1583208571017-0, stream=mystream, body={name=KevinBlandy}
最后
对于Streram还有一些其他的操作。例如:通过RedisTemplate
来发送消息,以及查看未ACK的消息,重新消费等等。 在这里没有一一列举。其实你如果学懂了Stream,那么我觉得这些API连蒙带猜也都知道是怎么用的。水到渠成的事儿,不难。