本节意在配置一个简单的只使用 divide 插件实现转发,进而了解整个网关全流程,管中窥豹。
使用配置
启动环境
接着上节环境接着说,使用本地 MySQL 环境,分别启动 Soul-bootstrap、Soul-admin 两个工程,此时我们可以登录 admin 界面,如下图所示:
开启 divide 插件
网关默认只启动了 divide 插件,我们可以在插件管理中看见。
添加选择器
根据 Soul 整体设计,可以知道选择器是类似 SGC 中 Route 概念,其中的条件就是 Predicate;
我们打开插件列表,进行配置 divide 插件,点击添加选择器:
关键点:
- 类型我们选择了 custom 自定义;
- 条件处选择 match 匹配规则,对应的 value 是
/bin/**
(符合 ant 规则); - 在配置这一行,强制需要填写的是 host、ip:port、weight 这三个字段;
- 对于 protocol 这个选项,默认显示是
http://
,也需要填写;
添加规则
上一步在选择器中,我们只是配置了当 URL 规则 match /bin/**
规则,使用我们这个选择器,但是更细的规则如何配置需要再 Rule 这一级别配置,否则请求不同,这里类似白名单,我们很灵活的控制暴露接口服务。
关键点:
- 在配置规则的时候,主要是对于条件的几种规则的选择;
- 本例子中使用的是
=
那么我们就必须也一起连带前缀,配置对应的 value 为:/bin/get
; - 如果想使用
match
模式,可以配置为:bin/get
; - 如果不带前缀我们可以使用
like
模式,此时配置的 value 就可以为:/get
; - 怎么样,是不是很神奇!
访问结果
日志记录
首先当我们请求 http://localhost:9195/bin/get
地址时,在日志中看到:
SoulWebHandler
明确两个主线
- 根据 SoulWebHandler 往上追溯,如何初始化,如何被调用,了解整个请求处理的发起源;
- 根据 SoulWebHandler 的 handle 方法,明确接受请求之后,网关实际请求处理;
第一主线
SoulWebHandler 初始化
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
final List<SoulPlugin> soulPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
return new SoulWebHandler(soulPlugins);
}
@Bean("dispatcherHandler")
public DispatcherHandler dispatcherHandler() {
return new DispatcherHandler();
}
此处依赖注入的 plugins,经过处理之后,直接初始化 SoulWebHandler。
SoulWebHandler 继承关系
public final class SoulWebHandler implements WebHandler {
....
}
此处我们可以知道 SoulWebHandler 实现 WebHandler 接口,并且配合上一步直接在 Spring 中声明自己为 webHandler
。
DefaultWebFilterChain
public class DefaultWebFilterChain implements WebFilterChain {
private final List<WebFilter> allFilters;
private final WebHandler handler;
@Nullable
private final WebFilter currentFilter;
@Nullable
private final DefaultWebFilterChain chain;
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
// 此处调用了 SoulWebHandler的 handler 方法
return Mono.defer(() ->
this.currentFilter != null && this.chain != null ?
invokeFilter(this.currentFilter, this.chain, exchange) :
this.handler.handle(exchange));
}
...
}
FilteringWebHandler
public class FilteringWebHandler extends WebHandlerDecorator {
private final DefaultWebFilterChain chain;
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
// 调用了 DefaultWebFilterChain#filter 方法
return this.chain.filter(exchange);
}
...
}
HttpWebHandlerAdapter
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
// 此处调用了 FilteringWebHandler#handle 方法
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
....
}
ReactorHttpHandlerAdapter
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
private static final Log logger = HttpLogging.forLogName(ReactorHttpHandlerAdapter.class);
private final HttpHandler httpHandler;
public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
Assert.notNull(httpHandler, "HttpHandler must not be null");
this.httpHandler = httpHandler;
}
@Override
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
// 调用了 HttpWebHandlerAdapter 的 handler 方法
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}
HttpServerHandle
final class HttpServerHandle extends HttpServerOperator implements ConnectionObserver, Function<ServerBootstrap, ServerBootstrap> {
final BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler;
HttpServerHandle(HttpServer server, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
super(server);
this.handler = (BiFunction)Objects.requireNonNull(handler, "handler");
}
protected TcpServer tcpConfiguration() {
return this.source.tcpConfiguration().bootstrap(this);
}
public void onStateChange(Connection connection, State newState) {
if (newState == HttpServerState.REQUEST_RECEIVED) {
try {
if (log.isDebugEnabled()) {
log.debug(ReactorNetty.format(connection.channel(), "Handler is being applied: {}"), new Object[]{this.handler});
}
HttpServerOperations ops = (HttpServerOperations)connection;
// 调用 ReactorHttpHandlerAdapter 的 apply 方法
Mono.fromDirect((Publisher)this.handler.apply(ops, ops)).subscribe(ops.disposeSubscriber());
} catch (Throwable var4) {
log.error(ReactorNetty.format(connection.channel(), ""), var4);
connection.channel().close();
}
}
}
public ServerBootstrap apply(ServerBootstrap b) {
ConnectionObserver observer = BootstrapHandlers.childConnectionObserver(b);
BootstrapHandlers.childConnectionObserver(b, observer.then(this));
return b;
}
}
在往上就是对应在 Netty 的 onInboundNext,channelread 等操作,我们至此大致了解了基本 netty 网络请求到我们的 SoulWebHandler 的 handle 方法,对于第二条主线我们下次分解;