使用spring-data-redis,链接工厂使用lettuceConnectionFactory。配置如下
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(PlanStreamMessageListener planStreamMessageListener){
// 创建配置对象
StreamMessageListenerContainer.
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
streamMessageListenerContainerOptions =
StreamMessageListenerContainer.
StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(1)
// 消息消费异常的handler
.errorHandler(e->{
log.error(e.getMessage(),e);
})
// 超时时间,设置为0,表示不超时(超时后会抛出异常)
.pollTimeout(Duration.ofSeconds(10))
// 序列化器
.serializer(new StringRedisSerializer())
.build();
// 根据配置对象创建监听容器对象
StreamMessageListenerContainer<String,MapRecord<String, String, String>> streamMessageListenerContainer =
StreamMessageListenerContainer
.create(lettuceConnectionFactory, streamMessageListenerContainerOptions);
// 使用监听容器对象开始监听消费(使用的是手动确认方式)
streamMessageListenerContainer.receive(Consumer.from(redisAppProperties.getGroupId(),
redisAppProperties.getConsumerId()),
StreamOffset.create(redisAppProperties.getReceiveStreamKey(), ReadOffset.lastConsumed()), planStreamMessageListener);
return streamMessageListenerContainer;
}
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
event
.getApplicationContext()
// 相当于返回js中的闭包函数,函数式编程思想,主要式方便链式调用,在这里并不能延迟初始化
.getBeanProvider(StreamMessageListenerContainer.class)
.ifAvailable(Lifecycle::start);
}
监听方式使用的时listener的方式,异常已经在方法内被捕获。
在使用的时候出现了问题:
2020-12-18 09:38:38.408 ERROR 236524 --- [SimpleAsyncTaskExecutor-1] c.b.boot.business.config.RedisConfig : Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s)
org.springframework.dao.QueryTimeoutException: Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s)
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:70) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:273) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.convertLettuceAccessException(LettuceStreamCommands.java:712) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:602) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:591) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:310) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:376) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:371) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:305) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$3(DefaultStreamMessageListenerContainer.java:236) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138) [spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123) [spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_231]
Caused by: io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s)
at io.lettuce.core.ExceptionFactory.createTimeoutException(ExceptionFactory.java:51) ~[lettuce-core-5.3.5.RELEASE.jar!/:5.3.5.RELEASE]
at io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:119) ~[lettuce-core-5.3.5.RELEASE.jar!/:5.3.5.RELEASE]
at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75) ~[lettuce-core-5.3.5.RELEASE.jar!/:5.3.5.RELEASE]
at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:79) ~[lettuce-core-5.3.5.RELEASE.jar!/:5.3.5.RELEASE]
at com.sun.proxy.$Proxy208.xreadgroup(Unknown Source) ~[na:na]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:600) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
... 12 common frames omitted
2020-12-18 10:01:39.805 ERROR 236524 --- [http-nio-8085-exec-5] o.a.c.c.C.[.[.[.[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [/business] threw exception [Request processing failed; nested exception is org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: java.io.IOException: 远程主机强迫关闭了一个现有的连接。] with root cause
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:1.8.0_231]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:1.8.0_231]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_231]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_231]
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) ~[netty-buffer-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1134) ~[netty-buffer-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.54.Final.jar!/:4.1.54.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.54.Final.jar!/:4.1.54.Final]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]
2020-12-18 10:01:39.944 INFO 236524 --- [lettuce-eventExecutorLoop-1-2] i.l.core.protocol.ConnectionWatchdog : Reconnecting, last destination was /xxx.xx.xx.xxx:6379
2020-12-18 10:01:40.017 INFO 236524 --- [lettuce-nioEventLoop-4-3] i.l.core.protocol.ReconnectionHandler : Reconnected to xxx.xx.xx.xxx:6379
很明显,中途有过一次断网的经历,但即使redis已经正常链接了,messageListener也无法再次收到任何关于通道的消息,这让人很郁闷,不能保证中途一次网都不会断,但一旦断网,redis stream的消息就失效了,有大佬遇到过这种问题,有解决方案吗?