博主,我遇到一个问题,我创建的key过一段时间之后,突然不见了,还是测试环境,吓死了。
key不见了,就那么几个情况
- 过期了
- 你自己不小心删除了
- 内存满了,执行了驱逐策略(默认是给客户端返回异常,不会驱逐Key)
没搞明白这个batchSize,是啥作用。我这边也是一条一条取。你解决批量拉取消息了吗?
我理解你的问题其实不是与redis断开连接后如何重连的问题,因为无论是lettuce还是jedis都能够在与redis断开后尝试重连;你的问题是不是重连后这个注册消费没有办法继续消费了? 如果是这个问题,那么是有办法的。这个注册消费的核心是StreamPollTask中的doLoop,可以阅读以下这个代码,在这个方法中,遇到exception时会通过cancelSubscriptionOnError.test(e)来检测是否需要cancel本次注册消费行为。cancelSubscriptionOnError其实是Predicate<Throwable>,springboot默认的规则是返回ture,也就是无论发生什么exception都会停止当前的loop,当与redis断开连接时会发生io exception,所以你得注册消费循环就停掉了。这就是根本原因。因此你想要在redis重连后还能继续消费就需要设置这个cancelSubscriptionOnError的规则,比如规则这样:
private static final Predicate<Throwable> neverCancelSub = t -> false;
然后在注册消费的时候这样设置:
return streamMessageListenerContainer.register(StreamMessageListenerContainer.StreamReadRequest
.builder(StreamOffset.create(stream, ReadOffset.lastConsumed()))
.cancelOnError(ConsumeDispense.neverCancelSub) // 发生异常时不cancel
.consumer(Consumer.from(group, consumer))
.autoAcknowledge(false) // 手动确认ack
.build(),
new StreamMessageListener(messageListener));
学习了。 
ByteRecord byteRecord = StreamRecords.rawBytes(Collections.singletonMap(“name”.getBytes(), “myVal”.getBytes())).withStreamKey(“mystream”.getBytes());这里能不能传递List或者Map类型的消息?
?没明白什么意思。你可以看看这个类 org.springframework.data.redis.connection.stream.StreamRecords 提供的一些方法。
我的意思是,如果我想发送一个Map<List>类型的消息到stream,然后我监听的时候该如何取到这条信息?
Redis的Stream,值是由key/value,组成的,类似于Map。
如果如果你的数据类型是Map/List,那么可以把它序列化成json数据,再作为map的值,进行传递。
value : [{"ke1": "v1"}]
厉害的厉害的,我费了老大劲,刚刚才想出来,你几分钟就ok了.谢谢大佬!
你好 请问,案例中用StreamMessageListener消费"mystream"流中的消息时,没有配置给StreamMessageListener配置一些对应的config属性它是怎么监听到对应"mystream"并消费的,我看其它例子都是给监听器注册配置了对应的流和组才实现的监听。
下面代码有啊,指定当前的消费组,消费者。已经要消费的stream名称。
streamMessageListenerContainer.receive(Consumer.from("group-1", "consumer-1"),
StreamOffset.create("mystream", ReadOffset.lastConsumed()), this.streamMessageListener);
没仔细看 QAQ ,对了 我使用jedis3.3作为SpringBoot客户端时启动报异常是因为jedis不支持stream吗?
nested exception is java.lang.UnsupportedOperationException: Streams not supported using Jedis!
对,日志写的很清楚。
你好 你找得到ConsumeDispense这个类吗?
cancelOnError(ConsumeDispense.neverCancelSub)
没有,这个估计是别人自定义的。
消息被别的消费者消费掉了,如何让所有消费者都同时消费到消息:
我两个虚拟机,启动项目后,监听消息有时虚拟机1收到消息,有时虚拟机二收到消息,注册监听器时,设置相同的消费者名字或不同的消费者名字,都会出现这样的问题。
多定义一个消费组就行了 一个消费组放一个消费者 这样两个消费者都能消费到同一条消息 你可以用ip区分它们俩
谢谢回复,我后面也确实是这样操作的,只不过我没有用IP区分,而是保存了一个随机字符串作为组名,以后每次消费消息,都按照这一个组名进行消费。
2qq