Feign Method Timeout 配置

发布时间 2023-05-06 17:33:17作者: 好凶的胖子

项目背景

这个应用是微服务架构,使用Springboot+Springcloud,其中Springcloud部分使用了openfeign来实现通讯交互。

项目结构层次,我们将一个微服务暴力的拆分成两个模块:xxx-api/xxx-server,xxx-api是用来发布交互的接口,xxx-server模块是用来运行web服务。对于xxx-api模块,我们根据业务场景,将不同的api聚合在不同的接口里,如:

-DemoClient
 -getDemos()
 -reportDemos()

现在客户要求,涉及到定时任务的调用都要改成同步调用,返回给调度服务真实的处理结果。但有的任务执行时间相对长,我们针对性的设置某些api的超时等待时间?

现有实现

FeignContext是个子容器,不同的ContextId间配置隔离。默认情况下,FeignClient的Name/Value等于ContextId,你也可以自定义ContextId。所以,ContextId最低也是FeignClient级别的,不能对方法级别进行配置。

默认全局配置

feign.client.config.default.connect-timeout=10000  
feign.client.config.default.read-timeout=60000

微服务级别配置

@FeignClient(value = "project-archetype", url = "${project-archetype-server-url-prefix}")  
@Tag(name = "ArchetypeDemoClient", description = "ArchetypeDemoClient")  
public interface ArchetypeDemoClient {}

@FeignClient(value = "project-archetype1", url = "${project-archetype-server-url-prefix}")  
@Tag(name = "ArchetypeDemoClient", description = "ArchetypeDemoClient")  
public interface ArchetypeDemoClient2 {}
feign.client.config.project-archetype.connect-timeout=10000  
feign.client.config.project-archetype.read-timeout=60000

接口级别配置

@FeignClient(value = "project-archetype", url = "${project-archetype-server-url-prefix}", contextId = "project-archetype-longtime")  
@Tag(name = "ArchetypeDemoClient", description = "ArchetypeDemoClient")  
public interface ArchetypeDemoClient3 {}
feign.client.config.project-archetype-longtime.connect-timeout=10000  
feign.client.config.project-archetype-longtime.read-timeout=1800000

方法级别配置

@FeignClient(value = "project-archetype", url = "${project-archetype-server-url-prefix}")  
@Tag(name = "ArchetypeDemoClient", description = "ArchetypeDemoClient")  
public interface ArchetypeDemoClient2 {  
  
    @Operation(tags = "save", description = "A sample interface of saving data")  
    @PostMapping(value = "/save", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)  
    ResponseResult<ArchetypeDemoVO> save(Request.Options options, @RequestBody @Valid ArchetypeDemoDTO dto);
}
// 这里有灵活了,可以通过读取配置
// sample是硬编码测试
archetypeDemoClient2.save(new Request.Options(10000, 30000), dto);

根据现有的实现,我们的可选项是

  1. 使用[[#接口级别配置]],把长时间的api拆出来成独立的接口;
  2. 使用[[#方法级别配置]],增加api的参数改造调用方式;

方案上,我更倾向于后者,但我不想调用方式改变。

源码

下面FeignClient初始化的代码路径中的一小节(全部的我也没仔细看):
FeignClientsRegistrar#registerFeignClient->
FeignClientFactoryBean#getTarget->Feign#targetbuild得到ReflectiveFeignReflectiveFeign#newInstance

public <T> T newInstance(Target<T> target) {  
    // 下面这行代码给当前Feign的每个方法生成了feign.InvocationHandlerFactory$MethodHandler,它实际上就是feign.SynchronousMethodHandler,
    // 而SynchronousMethodHandler里的options就是上面配置文件预先配置的,因此每个Method都一样。
    // result.put(md.configKey(), this.factory.create(target, md, (Factory)buildTemplate, this.options, this.decoder, this.errorDecoder));
    Map<String, MethodHandler> nameToHandler = this.targetToHandlersByName.apply(target);  
	
    Map<Method, MethodHandler> methodToHandler = new LinkedHashMap();  
    List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList();  
    Method[] var5 = target.type().getMethods();  
    int var6 = var5.length;  
  
    for(int var7 = 0; var7 < var6; ++var7) {  
        Method method = var5[var7];  
        if (method.getDeclaringClass() != Object.class) {  
            if (Util.isDefault(method)) {  
                DefaultMethodHandler handler = new DefaultMethodHandler(method);  
                defaultMethodHandlers.add(handler);  
                methodToHandler.put(method, handler);  
            } else {  
                methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));  
            }  
        }  
    }  
  
    InvocationHandler handler = this.factory.create(target, methodToHandler);  
    T proxy = Proxy.newProxyInstance(target.type().getClassLoader(), new Class[]{target.type()}, handler);  
    Iterator var12 = defaultMethodHandlers.iterator();  
  
    while(var12.hasNext()) {  
        DefaultMethodHandler defaultMethodHandler = (DefaultMethodHandler)var12.next();  
        defaultMethodHandler.bindTo(proxy);  
    }  
  
    return proxy;  
}

可以看到FeignClient最终就是一个proxy,proxy里的java.lang.reflect.InvocationHandler就是feign.ReflectiveFeign$FeignInvocationHandlerfeign.ReflectiveFeign$FeignInvocationHandlerdispatch里记录了FeignClient方法的处理feign.InvocationHandlerFactory$MethodHandlerfeign.SynchronousMethodHandler

feign.SynchronousMethodHandler代码处理请求过程中有一个逻辑,如果method请求参数里设置了Option的话,可以覆盖原有配置,否则就是用预先配置的。

public Object invoke(Object[] argv) throws Throwable {  
    RequestTemplate template = this.buildTemplateFromArgs.create(argv);  
    // 从参数里筛选options,如果没有的,返回预先配置的
    Options options = this.findOptions(argv);  
    Retryer retryer = this.retryer.clone();  
  
    while(true) {  
        try {  
            return this.executeAndDecode(template, options);  
        } catch (RetryableException var9) {  
          // retry handle
        }  
    }  
}

Options findOptions(Object[] argv) {  
    if (argv != null && argv.length != 0) {  
        Stream var10000 = Stream.of(argv);  
        Options.class.getClass();  
        var10000 = var10000.filter(Options.class::isInstance);  
        Options.class.getClass();  
        return (Options)var10000.map(Options.class::cast).findFirst().orElse(this.options);  
    } else {  
        return this.options;  
    }  
}

那我能不能对feign.SynchronousMethodHandler使用包装模式增加下”责任“呢,即在invoke前,把这个Object[] argv改下?

答案:可以。

新方法级别配置

核心代码

获取全部的FeignClients

private Map<String, Object> getAllFeignClients() {  
    return SpringUtil.getApplicationContext().getBeansWithAnnotation(FeignClient.class)  
            .entrySet().stream()  
            .filter(e -> e.getValue() instanceof Proxy)  
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));  
}

获取全部的Methodhanlders

public Optional<Map<Method, InvocationHandlerFactory.MethodHandler>> getMethodHandlers() {  
    for (Map.Entry<String, Object> entry : allFeignClients.entrySet()) {  
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(entry.getValue());  
        if (invocationHandler instanceof ReflectiveFeign.FeignInvocationHandler feignInvocationHandler) {  
            try {  
                Field field = ReflectiveFeign.FeignInvocationHandler.class.getDeclaredField("dispatch");  
                field.setAccessible(true);  
                //noinspection unchecked  
                Optional<Map<Method, InvocationHandlerFactory.MethodHandler>> methodMethodHandlerMap = Optional.ofNullable((Map<Method, InvocationHandlerFactory.MethodHandler>) field.get(feignInvocationHandler));  
                field.setAccessible(false);  
                return methodMethodHandlerMap;  
            } catch (NoSuchFieldException e) {  
                throw new InteriorException(InteriorErrorStatus.ofServerError("No dispatch field in ReflectiveFeign.FeignInvocationHandler"));  
            } catch (IllegalAccessException e) {  
                throw new InteriorException(InteriorErrorStatus.ofServerError("Get dispatch error of ReflectiveFeign.FeignInvocationHandler"));  
            }  
        }  
    }  
    return Optional.empty();  
}

替换现有的Methodhandler为包装类

private void enhanceAsTimeoutMethodHandler(Map<Method, InvocationHandlerFactory.MethodHandler> methodMethodHandlerMap) {  
    Map<Method, InvocationHandlerFactory.MethodHandler> changedMethodHandlerMap = new HashMap<>();  
  
    for (Map.Entry<Method, InvocationHandlerFactory.MethodHandler> methodHandlerEntry : methodMethodHandlerMap.entrySet()) {  
        Method method = methodHandlerEntry.getKey(); 
        // 自定义的注解,参见使用方式 
        TimeoutOptions timeoutOptions = method.getAnnotation(TimeoutOptions.class);
        // 如果有自定义的注解,且参数里不含Reqeust.Options
        // 就替换现有的MethodHandler为包装类  
        if (timeoutOptions != null && Arrays.stream(method.getParameterTypes()).noneMatch(e -> e.equals(Request.Options.class))) {  
            changedMethodHandlerMap.put(method, new TimeoutMethodHandler(method, timeoutOptions, methodHandlerEntry.getValue()));  
        }  
    }  
  
    String enhancedInfo = changedMethodHandlerMap.keySet().stream().map(Method::toGenericString)  
            .collect(Collectors.joining("\n"));  
  
    if (!StringUtils.isEmpty(enhancedInfo)) {  
        log.info("Enhanced for {} feign method timeout: {}", enhanceFor().toGenericString(), enhancedInfo);  
    }  
  
    methodMethodHandlerMap.putAll(changedMethodHandlerMap);  
}

包装类

@Slf4j  
public class TimeoutMethodHandler implements InvocationHandlerFactory.MethodHandler {  
    @Getter  
    @Setter    
    private Request.Options requestOptions;  
    private final InvocationHandlerFactory.MethodHandler delegate;  
    @Getter  
    private final String name;  
  
    public TimeoutMethodHandler(Method method, TimeoutOptions timeoutOptions, InvocationHandlerFactory.MethodHandler origin) {  
        this.name = method.toGenericString();  
        this.requestOptions = new Request.Options(timeoutOptions.connectTimeout(), timeoutOptions.connectTimeoutUnit(),  
                timeoutOptions.readTimeout(), timeoutOptions.readTimeoutUnit(), timeoutOptions.followRedirects());  
        this.delegate = origin;  
        // Fuck the fields of requestOptions are all final.  
        // Field optionsField = origin.getClass().getDeclaredField("options");        // this.requestOptions = optionsField.get(this.delegate);        
        // 增加参数后,原有的metadata里的index都增加1
        // 否则序列化就有问题了
        try {  
            Field methodMetadataField = origin.getClass().getDeclaredField("metadata");  
            methodMetadataField.setAccessible(true);  
            MethodMetadata methodMetadata = (MethodMetadata) methodMetadataField.get(this.delegate);  
            if (methodMetadata.urlIndex() != null) {  
                methodMetadata.urlIndex(methodMetadata.urlIndex() + 1);  
            }  
            if (methodMetadata.bodyIndex() != null) {  
                methodMetadata.bodyIndex(methodMetadata.bodyIndex() + 1);  
            }  
            if (methodMetadata.headerMapIndex() != null) {  
                methodMetadata.headerMapIndex(methodMetadata.headerMapIndex() + 1);  
            }  
            if (methodMetadata.queryMapIndex() != null) {  
                methodMetadata.queryMapIndex(methodMetadata.queryMapIndex() + 1);  
            }  
            methodMetadataField.setAccessible(false);  
        } catch (NoSuchFieldException | IllegalAccessException e) {  
            e.printStackTrace();  
            throw new InteriorException(InteriorErrorStatus.ofServerError("Modify method metadata error"));  
        }  
    }  
  
    @Override  
    public Object invoke(Object[] objects) throws Throwable { 
	    // 这里就是增加的“责任”了 
        if (objects != null && objects.length != 0) {  
            List<Object> originObjects = Arrays.stream(objects).toList();  
            ArrayList<Object> newObjects = new ArrayList<>(originObjects);  
            newObjects.add(0, requestOptions);  
            return this.delegate.invoke(newObjects.toArray());  
        }  
        return this.delegate.invoke(new Object[]{requestOptions});  
    }  
  
    @Override  
    public String toString() {  
        return "[%s]->(%s,%s,%s,%s,%s)".formatted(name, requestOptions.connectTimeout(), requestOptions.connectTimeoutUnit(),  
                requestOptions.readTimeout(), requestOptions.readTimeoutUnit(), requestOptions.isFollowRedirects());  
    }  
}

使用

@FeignClient(value = "project-archetype", url = "${project-archetype-server-url-prefix}")  
@Tag(name = "ArchetypeDemoClient", description = "ArchetypeDemoClient")  
public interface ArchetypeDemoClient2 {  
  
    @Operation(tags = "save", description = "A sample interface of saving data")  
    @PostMapping(value = "/save", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)  
    @TimeoutOptions(connectTimeout = 5000, readTimeout = 120000)  
    ResponseResult<ArchetypeDemoVO> save2(@RequestBody @Valid ArchetypeDemoDTO dto);  
  
}

扩展

这里硬编码了对应的时间戳,那如果我想修改呢?重启?

答案:不需要。

既然我们包装了MethodHandler,那我们就动态修改MethodHandler里的Request.Options

细节不说了,大致上是提供一个spring actuator的endpoint来读取全部包装过的FeignClient方法和修改指定FeignClient方法。

效果如下:
查询

curl --location 'http://127.0.0.1:8888/actuator/feignMethodTimeout' \

--data ''

查询结果

[
"[public abstract com.xxx.project.common.protocol.communication.ResponseResult<com.xxx.project.archetype.api.vo.ArchetypeDemoVO> com.xxx.project.archetype.api.ArchetypeDemoClient2.save2(com.xxx.project.archetype.api.dto.ArchetypeDemoDTO)]->(5000,MILLISECONDS,120000,MILLISECONDS,true)"

]

修改

curl --location 'http://127.0.0.1:8888/actuator/feignMethodTimeout' \
--header 'Content-Type: application/json' \
--data '{
    "method":"public abstract com.xxx.project.common.protocol.communication.ResponseResult<com.xxx.project.archetype.api.vo.ArchetypeDemoVO> com.xxx.project.archetype.api.ArchetypeDemoClient2.save2(com.xxx.project.archetype.api.dto.ArchetypeDemoDTO)",
    "options":"9000,MILLISECONDS,5000,MILLISECONDS,true"
}'

修改结果

changed