Koca Cloud Gateway现有限流介绍与拓展预研
简介
KOCA Cloud Gateway 基于 Spring Cloud Gateway、Spring Boot 2.x、Spring WebFlux 和 Reactor项目构建。在 Spring Cloud Gateway中,有着许多过滤器(Filter):这些是使用特定工厂构造的 Spring Framework GatewayFilter 实例。可以在发送下游请求之前或之后修改请求和响应。
RequestRateLimiter
GatewayFilter
工厂使用RateLimiter
实现来确定是否允许继续当前请求。如果不是,则返回状态HTTP 429 - Too Many Requests
(默认)。
该过滤器有一个可选的keyResolver
参数和特定于速率限制器的参数(请参见下文)。
RateLimiter
的默认实现为RedisRateLimiter
,该速率控制器依赖Redis,实现了令牌桶算法,它通过两个参数来控制限流速率与阈值:
-
redis-rate-limiter.replenishRate
:每秒允许用户执行的请求数,小于该值不会丢失任何请求。这是令牌桶 -
redis-rate-limiter.burstCapacity
:允许用户在一秒钟内突发执行的最大请求数。这是令牌桶可以容纳的令牌数量。将此值设置为零将阻止所有请求。
keyResolver
是实现KeyResolver
接口的 Spring Bean。在配置中,使用 SpEL 通过名称引用 Bean。#{@myKeyResolver}
是引用名称为myKeyResolver
Bean 的 SpEL 表达式。
public interface KeyResolver {
Mono<String> resolve(ServerWebExchange exchange);
}
在Spring Cloud Gateway 中KeyResolver
的默认实现是PrincipalNameKeyResolver
,它从ServerWebExchange
检索Principal
并调用Principal.getName()
。KOCA Cloud Gateway 在此基础上实现了keyResolver
,能够支持多种组合策略。
默认情况下,如果KeyResolver
未找到 key,则请求将被拒绝。可以使用spring.cloud.gateway.filter.request-rate-limiter.deny-empty-key
(true
或false
)和spring.cloud.gateway.filter.request-rate-limiter.empty-key-status-code
属性来调整此行为。
当前keyResolver
实现支持每个服务的速率限制策略列表,支持多个限流类型组件生成限流 Key。
限流类型 | 配置关键字 | 描述 |
---|---|---|
已认证用户 | USER |
使用已认证的用户名或anonymous
|
请求的来源 | ORIGIN |
使用 HTTP 请求头的 Origin |
URL | URL |
使用请求的路径 |
HTTP 请求的方法 | HTTP_METHOD |
使用 HTTP 请求的方法 |
URL 模板 | URL_PATTERN |
使用模板匹配请求的路径 |
koca:
cloud:
gateway:
route:
prefix: /
api-doc:
aggregation-mode: FROM_API
ratelimit:
enabled: true
default-policy: # 可选,如果服务没有特定的策略存在,则使用这个
matchTypes:
- USER
- ORIGIN
- URL
- HTTP_METHOD
policy-map:
admin:
types:
- USER
- ORIGIN
- URL
- HTTP_METHOD
cloud:
types: # 每个类型的可选值
- USER=anonymous
- ORIGIN=somemachine.com
- URL=/api # URI
- URL_PATTERN=/admin/** # 对于 url_pattern,值是必须的
- ROLE=user
- HTTP_METHOD=get # 大小写不敏感
- HTTP_HEADER=customHeader
使用案例
因为RateLimiter
的默认实现为RedisRateLimiter
,该速率控制器依赖Redis实现了令牌桶算法,所以首先我们添加redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
添加redis连接配置:
spring:
redis:
host: 127.0.0.1
port: 6379
现在我们有一个uri为http://localhost:8082的本地服务,所有请求前缀带了/test的请求我们都将它路由到该本地服务,我们将他的路由配置配置网关,该服务有一个请求路径为/testFlow的目标接口,现在,我们需要对其接口进行限流,配置如下:
spring:
cloud:
gateway:
routes:
# 目标服务的路由配置
- id: test
uri: http://localhost:8082
predicates:
- Path=/test/**
filters:
- StripPrefix=1
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 1 #每秒允许用户执行的请求数,小于该值不会丢失任何请求。这是令牌桶被填充的速率
redis-rate-limiter.burstCapacity: 2 #允许用户在一秒钟内突发执行的最大请求数。这是令牌桶可以容纳的令牌数量。将此值设置为零将阻止所有请求。
key-resolver: '#{@keyResolver}'
deny-empty-key: false #如果KeyResolver未找到 key,则请求将被允许通过,默认为true
koca:
cloud:
gateway:
route:
prefix: /
ratelimit:
enabled: true
# default-policy: # 可选,如果服务没有特定的策略存在,则使用这个
# matchTypes:
# - HTTP_HEADER=test
policy-map:
test:
matchTypes:
- URL=/testFlow #对/testFlow请求进行限流
我们向网关服务请求/test/testFlow,可以看到如下结果:
我们快速请求该接口,可以看到:
返回429状态码,接口已被限流,且网关后台打印日志如下:
当我们的目标服务注册到注册中心上时,我们的配置可以不对目标服务指定路由,具体配置如下:
spring:
cloud:
gateway:
discovery:
locator:
enabled: true #自动发现注册中心上的服务并添加到路由列表
lower-case-service-id: true
predicates:
- name: Path
args:
pattern: "'/'+serviceId+'/**'"
filters:
- name: RequestRateLimiter
args:
key-resolver: "'#{@keyResolver}'"
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 2
deny-empty-key: false
- name: RewritePath #路径重写过滤器不可缺少
args:
regexp: "'/' + serviceId + '/(?<remaining>.*)'"
replacement: "'/${remaining}'"
拓展
自定义限流规则加载、自定义限流算法拓展
首先我们定义自己的RedisRateLimiter,重新实现isAllowed方法,这里复制源码稍微改造下即可, 这边只贴核心代码。
public class MyRateLimiter extends AbstractRateLimiter<MyRateLimiter.Config> implements ApplicationContextAware {
/....
.....
.../
@Override
public Mono<Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
} else {
//可以改造改代码块根据routeId自定义获取限流速率配置,这里实现为写定的
int replenishRate = 2;
int burstCapacity = 1;
int requestedTokens = 1;
//这里默认为令牌桶算法,可以替换下文代码块,
try {
List<String> keys = getKeys(id);
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", "", requestedTokens + "");
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
return flux.onErrorResume((throwable) -> {
if (this.log.isDebugEnabled()) {
this.log.debug("Error calling rate limiter lua", throwable);
}
return Flux.just(Arrays.asList(1L, -1L));
}).reduce(new ArrayList(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map((results) -> {
boolean allowed = (Long)results.get(0) == 1L;
Long tokensLeft = (Long)results.get(1);
Response response = new Response(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));
if (this.log.isDebugEnabled()) {
this.log.debug("response: " + response);
}
return response;
});
} catch (Exception var10) {
this.log.error("Error determining if user allowed from redis", var10);
return Mono.just(new Response(true, getHeaders(replenishRate, burstCapacity, -1L)));
}
}
}
public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft) {
HashMap<String, String> headers = new HashMap<>();
headers.put(this.remainingHeader, tokensLeft.toString());
headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
return headers;
}
@Validated
public static class Config {
@Min(1L)
private int replenishRate;
@Min(0L)
private int burstCapacity = 1;
@Min(1L)
private int requestedTokens = 1;
public Config() {
}
public int getReplenishRate() {
return this.replenishRate;
}
public MyRateLimiter.Config setReplenishRate(int replenishRate) {
this.replenishRate = replenishRate;
return this;
}
public int getBurstCapacity() {
return this.burstCapacity;
}
public MyRateLimiter.Config setBurstCapacity(int burstCapacity) {
this.burstCapacity = burstCapacity;
return this;
}
public int getRequestedTokens() {
return this.requestedTokens;
}
public MyRateLimiter.Config setRequestedTokens(int requestedTokens) {
this.requestedTokens = requestedTokens;
return this;
}
public String toString() {
return (new ToStringCreator(this)).append("replenishRate", this.replenishRate).append("burstCapacity", this.burstCapacity).append("requestedTokens", this.requestedTokens).toString();
}
}
}
将路由的filter配置替换为咱们自定义的RateLimiter实现类:
spring:
cloud:
gateway:
routes:
# 目标服务的路由配置
- id: test
uri: http://localhost:8082
predicates:
- Path=/test/**
filters:
- StripPrefix=1
- name: RequestRateLimiter
args:
rate-limiter: '#{@myRateLimiter}' #自定义ratelimiter算法实现类
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 2
key-resolver: '#{@keyResolver}'
deny-empty-key: false #如果KeyResolver未找到 key,则请求将被允许通过,默认为true
自定义限流返回结果
目前限流无法返回自定义数据,而是返回HTTP 429 - Too Many Requests
,而且并没有相应配置改变错误处理方式
我们需要重写默认的限流过滤工厂类,修改返回为我们想要的自定义返回,下面为部分核心代码:
@Component
public class MyRequestRateLimiterGatewayFilterFactory extends RequestRateLimiterGatewayFilterFactory {
private final RateLimiter defaultRateLimiter;
private final KeyResolver defaultKeyResolver;
public MyRequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
super(defaultRateLimiter, defaultKeyResolver);
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
}
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.getKeyResolver(), defaultKeyResolver);
RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), defaultRateLimiter);
return (exchange, chain) -> resolver.resolve(exchange).flatMap(key -> {
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
String finalRouteId = routeId;
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
//处理自定义的返回结果
log.warn("已限流: {}", finalRouteId);
ServerHttpResponse httpResponse = exchange.getResponse();
//修改code为500
httpResponse.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
if (!httpResponse.getHeaders().containsKey("Content-Type")) {
httpResponse.getHeaders().add("Content-Type", "application/json");
}
//此处无法触发全局异常处理,手动返回
DataBuffer buffer = httpResponse.bufferFactory().wrap(("{\n"
+ " \"code\": \"1414\","
+ " \"message\": \"服务器限流\","
+ " \"data\": \"Server throttling\","
+ " \"success\": false"
+ "}").getBytes(StandardCharsets.UTF_8));
return httpResponse.writeWith(Mono.just(buffer));
});
});
}
private <T> T getOrDefault(T configValue, T defaultValue) {
return (configValue != null) ? configValue : defaultValue;
}
}
然后将配置文件配置的filtername配置为重写后的过滤器:
spring:
cloud:
gateway:
routes:
# 目标服务的路由配置
- id: test
uri: http://localhost:8082
predicates:
- Path=/test/**
filters:
- StripPrefix=1
- name: MyRequestRateLimiter #修改成自定义的限流过滤器
args:
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 2
key-resolver: '#{@keyResolver}'
deny-empty-key: false #如果KeyResolver未找到 key,则请求将被允许通过,默认为true