redis stream 命令行客户端和java客户端使用

发布时间 2023-03-22 21:16:09作者: zhangyukun
  1. redis stream 是发布订阅机制的升级版本,或者说是 redis pub/sub 的升级版,是redis 5的新增特色,redis的 stream就是一个简单的消息队列。

  2. redis 的发布订阅极度不可靠,它不支持持久化,不管是消费者下线,还是消息积压都会导致消息丢失

    • 消息积压的默认处理方式是吧消费者踢下线,然后释放资源
    • 通过配置 client-output-buffer-limit pubsub 32mb 8mb 60 可以指定发布订阅消息缓存区大小,这行配置的意思是最大使用32mb内存,并且如果8mb 超过60秒也会吧消费者踢下线。
  3. redis stream 大有持久化,可以把它单做一个简单的mq来使用。

  4. 添加消息格式: xadd 队列名字 * fied1 value1 fied2 value2

    • 队列名不存在会默认创建这个队列
    • * 表示 消息Id自动生成,也可手动指定唯一消息Id
    • fied1 value1 成对出现 可以有多对,是消息的字段和值
    • 例子: xadd myQueue * f1 v1
  5. 给指定队列添加消费者组: xgroup create 队列名 消费者组名 0-0

    • 0-0 表示读取所有消息
    • 例子: xgroup create myQueue g1 0-0
  6. 开始消费消息: xreadgroup group 消费者组名 消费者名字 count 5 streams 队列名字 >

    • 注意后面的>不能少
    • 消费者名字 如果是不同的,那么在消息 ack 之前别的消费者是可以读到的,消费者不需要预先创建
    • count 5 表示本次读取多少条消息
    • streams 后面是队列名字
    • xreadgroup group g1 cosumer count 5 streams myQueue >
    • 如果 对面名字后面带上 id 就不需要 >
  7. 不适用消费组也可以监听消息

    • XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
    • COUNT 后面数读取的数量
    • BLOCK 后面是阻塞时间
    • STREAMS 后面数队列名字
    • id使用 0-0 表示从头开始读取
    • id使用 $表示从尾部开始读取,这时候只有阻塞的方式等待新消息进来才能读取到消息
  8. 确认消息:xack 队列名 消费者组名 消息Id

    • 例子:xack myQueue g1 1679313470900-0
  9. 不同消费者组消费的消息有独立的当前消费位置指针,没有被确认的消息会被重复消费

    • 我控制台测试没有被重复消息
  10. 查看组内已经被读取,但是没有ack的消息: xpengding 队列名 组名
    image-20230321133849816

    • 例子:xpending que1 g1
    • 第一个Id是最后一个消息Id,第二个id是当前消息偏移量。最前面的5 是剩余带消费的消息数量
    • 通过panding 消息列表,解决消费者没有ack消息就挂掉的问题,重启以后读取这个列表在吧里面的消费了就行了。
  11. 如果使用 xpengding 队列名 组名 - + count ,会列出已读未消费的详细信息,箭头分别是消息Id,消费者组名字,上传投递过去了多久,和被投递了几次。
    image-20230321142131357

  12. 消息认领,如果消费者长时间活不过来,可以这个消费者上面超过一定时长的消息转移给别的消费者

    • 语法:xclaim 队列名 消费者组名 转到的消费者组名 未确认时间毫秒数 消息Id
    • 例子: claim myQueue g1 cosumer2 60000 id
    • 设置时间是避免一个消息被重复转移,这条转移语句被执行了2次的时候。
    • 转移以后消息投递数量会增加,并且读取时间重置。
  13. 查看消息长度:xlen 队列名字

  14. 查看指定范围的消息:xrange 队列名 - +

    • -表示最小的id
    • +表示最大的id
    • -和+可以写成具体的id
  15. 删除消息: xdel 队列名 id或者直接 del 队列名 删除全部

    • 如果一个消息多次被认领依旧无法被消费,可以考虑删除,删除消息也需要ack,不然还会在 pending里面。
  16. xinfo可以查询

    • 查询队列信息:Xinfo stream 队列名
      image-20230321151504165
    • 查询消费者组信息:xinfo groups 队列名
      image-20230321151611061
    • 查询指定消费者组上消费者的信息:xinfo consumers 队列名字 消费者组名字
      image-20230321151707088
  17. java 使用 redis stream 例子

    	@ApiOperation(value = "redis流 添加消息")
    	@RequestMapping(value="stream/add", method= {RequestMethod.GET})
    	public String streamAdd() throws Exception{
    
    		Map<String,String> map = new HashMap<>();
    		map.put("name","张三");
    		map.put("age","33");
    
    		redisTemplateString.<String,String>opsForStream().add("myQueue",map);
    
    
    		return "OK";
    	}
    
    
    	@ApiOperation(value = "redis流 消费者获取消息,并且确认消息")
    	@RequestMapping(value="stream/get", method= {RequestMethod.GET})
    	public String streamGet() throws Exception{
    		Map<String,String> map = new HashMap<>();
    		map.put("name","张三");
    		map.put("age","33");
    
    
    		//添加两个消费者组
    		try{
    			redisTemplateJson.<String,String>opsForStream().createGroup("myQueue","myGroup1");
    			redisTemplateJson.<String,String>opsForStream().createGroup("myQueue","myGroup2");
    		}catch (Exception e){
    			System.out.println( "消费者组已经存在" );
    		}
    
    		//两个消费者
    		Consumer consumer1 = Consumer.from("myGroup1", "consumer1");
    		Consumer consumer2 = Consumer.from("myGroup2", "consumer2");
    
    
    		//读取消息
    		StreamOffset<Object> myQueue1 = StreamOffset.<Object>create("myQueue", ReadOffset.lastConsumed());
    		StreamOffset<Object> myQueue2 = StreamOffset.<Object>create("myQueue", ReadOffset.lastConsumed());
    		List<MapRecord<Object, String, String>> read1 = redisTemplateJson.<String, String>opsForStream().read(consumer1, myQueue1);
    		List<MapRecord<Object, String, String>> read2 = redisTemplateJson.<String, String>opsForStream().read(consumer2, myQueue2);
    
    		System.out.println("消费者组1:" + JSONObject.toJSONString( read1 ));
    		System.out.println("消费者组2:" + JSONObject.toJSONString( read2 ));
    
    
    
    		//读取penging消息
    		PendingMessagesSummary pendingMessagesSummary = redisTemplateJson.<String, String>opsForStream().pending("myQueue", consumer1.getGroup());
    		Range<String> idRange = pendingMessagesSummary.getIdRange();
    		System.out.println("没有ack的消息:" +JSONObject.toJSONString( idRange ));
    
    		//确认消息
    		redisTemplateJson.<String, String>opsForStream().acknowledge("myQueue",consumer1.getGroup(), RecordId.of( idRange.getLowerBound().getValue().get() ) );
    
    		return "OK";
    	}
    
    
    
    
    	@ApiOperation(value = "检查消费者组是否存在")
    	@RequestMapping(value="stream/checkGroup", method= {RequestMethod.GET})
    	public Boolean streamCheckGroup(String groupName) throws Exception{
    
    		StreamInfo.XInfoGroups groupInfo = redisTemplateJson.opsForStream().groups("myQueue");
    
    		Set<String> groupNames = new HashSet<>();
    		if( !groupInfo.isEmpty() ){
    			Iterator<StreamInfo.XInfoGroup> iterator = groupInfo.iterator();
    			iterator.forEachRemaining( item -> groupNames.add( item.groupName() ) );
    		}
    		System.out.println(JSONUtil.toJsonStr( groupNames ));
    		return groupNames.contains( groupName );
    	}
    
    
  18. 上面例子中获取消息的监听不是后台运行的,正常应该让它在后台监听消息

    package com.lomi.redis;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.stream.Consumer;
    import org.springframework.data.redis.connection.stream.MapRecord;
    import org.springframework.data.redis.connection.stream.ReadOffset;
    import org.springframework.data.redis.connection.stream.StreamOffset;
    import org.springframework.data.redis.stream.StreamListener;
    import org.springframework.data.redis.stream.StreamMessageListenerContainer;
    import org.springframework.data.redis.stream.Subscription;
    import org.springframework.stereotype.Component;
    
    import java.time.Duration;
    
    /**
     *  redis stream 监听消息
     * 
     * @author ZHANGYUKUN
     *
     */
    @Configuration
    @Component
    public class RedisStreamConfig {
    
    
    	@Bean
    	public Subscription subscription(RedisConnectionFactory redisConnectionFactory){
    		//监听容器配置
    		StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
    				.StreamMessageListenerContainerOptions
    				.builder()
    				.pollTimeout(Duration.ofSeconds(1))
    				.build();
    
    		//监听器实现
    		MyStreamListener streamListener = new MyStreamListener();
    
    
    		//创建监听容器
    		StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
    
    		//groupName需要提前创建
    		Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from("myGroup1", "consumer2"),
    				StreamOffset.create("myQueue", ReadOffset.lastConsumed()),
    				streamListener);
    
    
    		listenerContainer.start();
    		System.out.println("------------------------------------------stream监听启动-----------------------------------------------------------");
    
    		return subscription;
    	}
    
    
    
    
    	 class MyStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
    
    		@Override
    		public void onMessage(MapRecord<String, String, String> entries) {
    			System.out.println("message id "+entries.getId());
    			System.out.println("stream "+entries.getStream());
    			System.out.println("body "+entries.getValue());
    		}
    
    	}
    }