Koca Cloud Gateway现有限流介绍与拓展预研

Koca Cloud Gateway现有限流介绍与拓展预研

简介

KOCA Cloud Gateway 基于 Spring Cloud Gateway、Spring Boot 2.x、Spring WebFlux 和 Reactor项目构建。在 Spring Cloud Gateway中,有着许多过滤器(Filter):这些是使用特定工厂构造的 Spring Framework GatewayFilter 实例。可以在发送下游请求之前或之后修改请求和响应。

RequestRateLimiter GatewayFilter工厂使用RateLimiter实现来确定是否允许继续当前请求。如果不是,则返回状态HTTP 429 - Too Many Requests(默认)。

该过滤器有一个可选的keyResolver参数和特定于速率限制器的参数(请参见下文)。

RateLimiter的默认实现为RedisRateLimiter,该速率控制器依赖Redis,实现了令牌桶算法,它通过两个参数来控制限流速率与阈值:

  • redis-rate-limiter.replenishRate:每秒允许用户执行的请求数,小于该值不会丢失任何请求。这是令牌桶
  • redis-rate-limiter.burstCapacity:允许用户在一秒钟内突发执行的最大请求数。这是令牌桶可以容纳的令牌数量。将此值设置为零将阻止所有请求。

keyResolver是实现KeyResolver接口的 Spring Bean。在配置中,使用 SpEL 通过名称引用 Bean。#{@myKeyResolver} 是引用名称为myKeyResolver Bean 的 SpEL 表达式。

public interface KeyResolver {
    Mono<String> resolve(ServerWebExchange exchange);
}

在Spring Cloud Gateway 中KeyResolver的默认实现是PrincipalNameKeyResolver,它从ServerWebExchange检索Principal并调用Principal.getName()。KOCA Cloud Gateway 在此基础上实现了keyResolver,能够支持多种组合策略。

默认情况下,如果KeyResolver未找到 key,则请求将被拒绝。可以使用spring.cloud.gateway.filter.request-rate-limiter.deny-empty-keytruefalse)和spring.cloud.gateway.filter.request-rate-limiter.empty-key-status-code属性来调整此行为。

当前keyResolver实现支持每个服务的速率限制策略列表,支持多个限流类型组件生成限流 Key。

限流类型 配置关键字 描述
已认证用户 USER 使用已认证的用户名或anonymous
请求的来源 ORIGIN 使用 HTTP 请求头的 Origin
URL URL 使用请求的路径
HTTP 请求的方法 HTTP_METHOD 使用 HTTP 请求的方法
URL 模板 URL_PATTERN 使用模板匹配请求的路径
koca:
  cloud:
    gateway:
      route:
        prefix: /
      api-doc:
        aggregation-mode: FROM_API
      ratelimit:
        enabled: true
        default-policy: # 可选,如果服务没有特定的策略存在,则使用这个
          matchTypes:
            - USER
            - ORIGIN
            - URL
            - HTTP_METHOD
        policy-map:
          admin:
            types:
              - USER
              - ORIGIN
              - URL
              - HTTP_METHOD
          cloud:
            types: # 每个类型的可选值
              - USER=anonymous
              - ORIGIN=somemachine.com
              - URL=/api # URI
              - URL_PATTERN=/admin/** # 对于 url_pattern,值是必须的
              - ROLE=user
              - HTTP_METHOD=get # 大小写不敏感
              - HTTP_HEADER=customHeader

使用案例

因为RateLimiter的默认实现为RedisRateLimiter,该速率控制器依赖Redis实现了令牌桶算法,所以首先我们添加redis依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

添加redis连接配置:

spring:
  redis:
    host: 127.0.0.1
    port: 6379

现在我们有一个uri为http://localhost:8082的本地服务,所有请求前缀带了/test的请求我们都将它路由到该本地服务,我们将他的路由配置配置网关,该服务有一个请求路径为/testFlow的目标接口,现在,我们需要对其接口进行限流,配置如下:

spring:
  cloud:
    gateway:
      routes:
        # 目标服务的路由配置
        - id: test
          uri: http://localhost:8082
          predicates:
            - Path=/test/**
          filters:
            - StripPrefix=1
            - name: RequestRateLimiter
              args:
                redis-rate-limiter.replenishRate: 1 #每秒允许用户执行的请求数,小于该值不会丢失任何请求。这是令牌桶被填充的速率
                redis-rate-limiter.burstCapacity: 2 #允许用户在一秒钟内突发执行的最大请求数。这是令牌桶可以容纳的令牌数量。将此值设置为零将阻止所有请求。
                key-resolver: '#{@keyResolver}'
                deny-empty-key: false #如果KeyResolver未找到 key,则请求将被允许通过,默认为true
                
koca:
  cloud:
    gateway:
      route:
        prefix: /
      ratelimit:
        enabled: true
#        default-policy: # 可选,如果服务没有特定的策略存在,则使用这个
#          matchTypes:
#            - HTTP_HEADER=test
        policy-map:
          test:
            matchTypes:
              - URL=/testFlow  #对/testFlow请求进行限流

我们向网关服务请求/test/testFlow,可以看到如下结果:

我们快速请求该接口,可以看到:

返回429状态码,接口已被限流,且网关后台打印日志如下:

当我们的目标服务注册到注册中心上时,我们的配置可以不对目标服务指定路由,具体配置如下:

spring:
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true  #自动发现注册中心上的服务并添加到路由列表
          lower-case-service-id: true
          predicates:
            - name: Path
              args:
                pattern: "'/'+serviceId+'/**'"
          filters:
            - name: RequestRateLimiter
              args:
                key-resolver: "'#{@keyResolver}'"
                redis-rate-limiter.replenishRate: 1
                redis-rate-limiter.burstCapacity: 2
                deny-empty-key: false
            - name: RewritePath   #路径重写过滤器不可缺少
              args:
                regexp: "'/' + serviceId + '/(?<remaining>.*)'"
                replacement: "'/${remaining}'"

拓展

自定义限流规则加载、自定义限流算法拓展

首先我们定义自己的RedisRateLimiter,重新实现isAllowed方法,这里复制源码稍微改造下即可, 这边只贴核心代码。

public class MyRateLimiter extends AbstractRateLimiter<MyRateLimiter.Config> implements ApplicationContextAware {
    
    /....
    .....
    .../


    @Override
    public Mono<Response> isAllowed(String routeId, String id) {
        if (!this.initialized.get()) {
            throw new IllegalStateException("RedisRateLimiter is not initialized");
        } else {

            //可以改造改代码块根据routeId自定义获取限流速率配置,这里实现为写定的
            int replenishRate = 2;
            int burstCapacity = 1;
            int requestedTokens = 1;

            //这里默认为令牌桶算法,可以替换下文代码块,
            try {
                List<String> keys = getKeys(id);
                List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", "", requestedTokens + "");
                Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
                return flux.onErrorResume((throwable) -> {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Error calling rate limiter lua", throwable);
                    }

                    return Flux.just(Arrays.asList(1L, -1L));
                }).reduce(new ArrayList(), (longs, l) -> {
                    longs.addAll(l);
                    return longs;
                }).map((results) -> {
                    boolean allowed = (Long)results.get(0) == 1L;
                    Long tokensLeft = (Long)results.get(1);
                    Response response = new Response(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("response: " + response);
                    }

                    return response;
                });
            } catch (Exception var10) {
                this.log.error("Error determining if user allowed from redis", var10);
                return Mono.just(new Response(true, getHeaders(replenishRate, burstCapacity, -1L)));
            }
        }
    }

    public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft) {
        HashMap<String, String> headers = new HashMap<>();
        headers.put(this.remainingHeader, tokensLeft.toString());
        headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
        headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
        return headers;
    }

    @Validated
    public static class Config {
        @Min(1L)
        private int replenishRate;
        @Min(0L)
        private int burstCapacity = 1;
        @Min(1L)
        private int requestedTokens = 1;

        public Config() {
        }

        public int getReplenishRate() {
            return this.replenishRate;
        }

        public MyRateLimiter.Config setReplenishRate(int replenishRate) {
            this.replenishRate = replenishRate;
            return this;
        }

        public int getBurstCapacity() {
            return this.burstCapacity;
        }

        public MyRateLimiter.Config setBurstCapacity(int burstCapacity) {
            this.burstCapacity = burstCapacity;
            return this;
        }

        public int getRequestedTokens() {
            return this.requestedTokens;
        }

        public MyRateLimiter.Config setRequestedTokens(int requestedTokens) {
            this.requestedTokens = requestedTokens;
            return this;
        }

        public String toString() {
            return (new ToStringCreator(this)).append("replenishRate", this.replenishRate).append("burstCapacity", this.burstCapacity).append("requestedTokens", this.requestedTokens).toString();
        }
    }
}

将路由的filter配置替换为咱们自定义的RateLimiter实现类:

spring:
  cloud:
    gateway:
      routes:
        # 目标服务的路由配置
        - id: test
          uri: http://localhost:8082
          predicates:
            - Path=/test/**
          filters:
            - StripPrefix=1
            - name: RequestRateLimiter
              args:
                rate-limiter: '#{@myRateLimiter}' #自定义ratelimiter算法实现类
                redis-rate-limiter.replenishRate: 1 
                redis-rate-limiter.burstCapacity: 2 
                key-resolver: '#{@keyResolver}'
                deny-empty-key: false #如果KeyResolver未找到 key,则请求将被允许通过,默认为true
                

自定义限流返回结果

目前限流无法返回自定义数据,而是返回HTTP 429 - Too Many Requests,而且并没有相应配置改变错误处理方式

我们需要重写默认的限流过滤工厂类,修改返回为我们想要的自定义返回,下面为部分核心代码:

@Component
public class MyRequestRateLimiterGatewayFilterFactory extends RequestRateLimiterGatewayFilterFactory {

  private final RateLimiter defaultRateLimiter;

  private final KeyResolver defaultKeyResolver;

  public MyRequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
    super(defaultRateLimiter, defaultKeyResolver);
    this.defaultRateLimiter = defaultRateLimiter;
    this.defaultKeyResolver = defaultKeyResolver;
  }

  @Override
  public GatewayFilter apply(Config config) {
    KeyResolver resolver = getOrDefault(config.getKeyResolver(), defaultKeyResolver);
    RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), defaultRateLimiter);
    return (exchange, chain) -> resolver.resolve(exchange).flatMap(key -> {
      String routeId = config.getRouteId();
      if (routeId == null) {
        Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
        routeId = route.getId();
      }
      String finalRouteId = routeId;
      return limiter.isAllowed(routeId, key).flatMap(response -> {
        for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
          exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
        }
        if (response.isAllowed()) {
          return chain.filter(exchange);
        }
        
         //处理自定义的返回结果
        log.warn("已限流: {}", finalRouteId);
        ServerHttpResponse httpResponse = exchange.getResponse();
        //修改code为500
        httpResponse.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
        if (!httpResponse.getHeaders().containsKey("Content-Type")) {
          httpResponse.getHeaders().add("Content-Type", "application/json");
        }
        //此处无法触发全局异常处理,手动返回
        DataBuffer buffer = httpResponse.bufferFactory().wrap(("{\n"
            + "  \"code\": \"1414\","
            + "  \"message\": \"服务器限流\","
            + "  \"data\": \"Server throttling\","
            + "  \"success\": false"
            + "}").getBytes(StandardCharsets.UTF_8));
        return httpResponse.writeWith(Mono.just(buffer));
      });
    });
  }

  private <T> T getOrDefault(T configValue, T defaultValue) {
    return (configValue != null) ? configValue : defaultValue;
  }
}

然后将配置文件配置的filtername配置为重写后的过滤器:

spring:
  cloud:
    gateway:
      routes:
        # 目标服务的路由配置
        - id: test
          uri: http://localhost:8082
          predicates:
            - Path=/test/**
          filters:
            - StripPrefix=1
            - name: MyRequestRateLimiter #修改成自定义的限流过滤器
              args:
                redis-rate-limiter.replenishRate: 1 
                redis-rate-limiter.burstCapacity: 2 
                key-resolver: '#{@keyResolver}'
                deny-empty-key: false #如果KeyResolver未找到 key,则请求将被允许通过,默认为true
1 个赞

:+1: