使用RabbitMQ实现TTL机制

阿里云教程2个月前发布
11 0 0

我们想象这样一个场景,在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单撤销。这个怎么实现?

TTL的设置方法

TTL,Time to Live 的简称,即过期时间。RabbitMQ 可以对消息和队列两个维度来设置TTL。
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。目前有两种方法可以设置消息的TTL。
1)通过Queue属性设置,队列中所有消息都有一样的过期时间。
2)对消息自身进行单独设置,每条消息的TTL 可以不同。
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。一般来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。“死信”要被取出来消费,需要特殊的手段,我们后来再做介绍。

TTL的规则

1)如果不设置TTL,则表明此消息不会过期;
2)如果TTL设置为0,则表明除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;
注意理解message-ttl 、x-expires 这两个参数的区别,有不同的含义。但是这遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)

TTL的具体实现

添加pom.xml文件依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

application.properties添加rabbitmq连接信息

spring.application.name=ttl
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

启动入口类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitmqDemo {
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemo07.class, args);
    }
}

RabbitConfig类

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue queueTTLWaiting() {
        Map<String, Object> props = new HashMap<>();
        // 对于该队列中的消息,设置都等待10s
        props.put("x-message-ttl", 10000);
        Queue queue = new Queue("q.pay.ttl-waiting", false, false,false, props);
        return queue;
    }

    @Bean
    public Queue queueWaiting() {
        Queue queue = new Queue("q.pay.waiting", false, false,false);
        return queue;
    }

    @Bean
    public Exchange exchangeTTLWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.ttlwaiting",false, false);
        return exchange;
    }

   /**
    * 该交换器使用的时候,需要给每个消息设置有效期
    * @return
    */
    @Bean
    public Exchange exchangeWaiting() {
        DirectExchange exchange = new
        DirectExchange("ex.pay.waiting", false, false);
        return exchange;
    }

    @Bean
    public Binding bindingTTLWaiting() {
        return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttlwaiting").noargs();
    }

    @Bean
    public Binding bindingWaiting() {
        return  BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
    }
}

PayController类实现

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

@RestController
public class PayController {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/pay/queuettl")
    public String sendMessage() {
        rabbitTemplate.convertAndSend("ex.pay.ttl-waiting",
        "pay.ttl-waiting", "发送了TTL-WAITING-MESSAGE");
        return "queue-ttl-ok";
    }

    @RequestMapping("/pay/msgttl")
    public String sendTTLMessage() throws UnsupportedEncodingException {
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("5000");
        Message message = new Message("发送了WAITINGMESSAGE".
        getBytes("utf-8"), properties);
        rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
        return "msg-ttl-ok";
    }
}

© 版权声明

相关文章

暂无评论

none
暂无评论...