问题:
socket-io服务注册到nacos, 出现多个socket-io实例。此时前端发送请求,nginx请求gateway地址,gateway根据url寻找注册到nacos上的服务名,服务名有多个实例,nacos默认的轮询策略会出现服务连接失败。因为有的socket-io服务里面没有需要的namespace。所以报错。并且出现连接成功后中间断开请求。因为是上一次的请求没有断开导致超时然后出现的蝴蝶效应。
解决方案:
1. 重写LoadRalancer策略
代码如下, 请供参考:
pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<version>4.0.3</version>
</dependency>
实现代码
import com.alibaba.cloud.commons.lang.StringUtils;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.loadbalancer.NacosLoadBalancer;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.core.Balancer;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.*;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.http.HttpHeaders;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
// 自定义负载均衡实现需要实现 ReactorServiceInstanceLoadBalancer 接口 以及重写choose方法
public class NacosSocketCustomLoadBalancer extends NacosLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final String SERVICE_id = "socket-io-service";
// 注入当前服务的nacos的配置信息
private final NacosDiscoveryProperties nacosDiscoveryProperties;
// loadbalancer 提供的访问当前服务的名称
final String serviceId;
// loadbalancer 提供的访问的服务列表
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
Request request;
public NacosSocketCustomLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, NacosDiscoveryProperties nacosDiscoveryProperties) {
super(serviceInstanceListSupplierProvider, serviceId, nacosDiscoveryProperties);
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
}
/**
* 服务器调用负载均衡时调的放啊
* 此处代码内容与 RandomLoadBalancer 一致
*/
public Mono<Response<ServiceInstance>> choose(Request request) {
this.request = request;
System.out.println("choose.request:" + request);
ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
if (SERVICE_id.equals(this.serviceId)) {
Flux<List<ServiceInstance>> listFlux = supplier.get(request);
return listFlux.next().map((serviceInstances) -> this.processInstanceResponse(supplier, serviceInstances));
}
// 不是我们需要的服务ID走默认的Nacos轮询策略
return super.choose(request);
}
/**
* 对负载均衡的服务进行筛选的方法
* 此处代码内容与 RandomLoadBalancer 一致
*/
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
System.out.println("processInstanceResponse.serviceInstances:" + serviceInstances);
Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances);
System.out.println("serviceInstanceResponse ==> " + serviceInstanceResponse);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
/**
* 对负载均衡的服务进行筛选的方法
* 自定义
* 此处的 instances 实例列表 只会提供健康的实例 所以不需要担心如果实例无法访问的情况
*/
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
return new EmptyResponse();
}
// 获取当前服务所在的集群名称
String currentServiceName = nacosDiscoveryProperties.getService();
System.out.println("currentServiceName:" + currentServiceName);
// 过滤在同一集群下注册的服务 根据集群名称筛选的集合
List<ServiceInstance> sameClusterNameInstList = instances.stream().filter(i -> StringUtils.equals(i.getMetadata().get("nacos.service"), currentServiceName)).collect(Collectors.toList());
ServiceInstance sameClusterNameInst;
if (sameClusterNameInstList.isEmpty()) {
// 如果为空,则根据权重直接过滤所有服务列表
sameClusterNameInst = getHostByRandomWeight(instances);
} else {
// 如果不为空,则根据权重直接过滤所在集群下的服务列表
sameClusterNameInst = getHostByRandomWeight(sameClusterNameInstList);
}
return new DefaultResponse(sameClusterNameInst);
}
private ServiceInstance getHostByRandomWeight(List<ServiceInstance> sameClusterNameInstList) {
List<Instance> list = new ArrayList<>();
Map<String, ServiceInstance> dataMap = new HashMap<>();
// 此处将 ServiceInstance 转化为 Instance 是为了接下来调用nacos中的权重算法,由于入参不同,所以需要转换,此处建议打断电进行参数调试,以下是我目前为止所用到的参数,转化为map是为了最终方便获取取值到的服务对象
sameClusterNameInstList.forEach(i -> {
Instance ins = new Instance();
Map<String, String> metadata = i.getMetadata();
ins.setInstanceId(metadata.get("nacos.instanceId"));
ins.setWeight(new BigDecimal(metadata.get("nacos.weight")).doubleValue());
ins.setClusterName(metadata.get("nacos.cluster"));
ins.setEphemeral(Boolean.parseBoolean(metadata.get("nacos.ephemeral")));
ins.setHealthy(Boolean.parseBoolean(metadata.get("nacos.healthy")));
ins.setPort(i.getPort());
ins.setIp(i.getHost());
ins.setServiceName(i.getServiceId());
ins.setMetadata(metadata);
list.add(ins);
// key为服务ID,值为服务对象
dataMap.put(metadata.get("nacos.instanceId"), i);
});
// 调用自定义选择socket实例的算法
System.out.println("main list:{ }" + list);
Instance hostByRandomWeightCopy = ExtendBalancer.getHostBySocketResponse(list, this.request);
// 根据最终ID获取需要返回的实例对象
return dataMap.get(hostByRandomWeightCopy.getInstanceId());
}
}
class ExtendBalancer extends Balancer {
public static Instance getHostBySocketResponse(List<Instance> hosts, Request request) {
DefaultRequest defaultRequest = (DefaultRequest) request;
Instance availableInstances = getAvailableInstances(hosts, defaultRequest);
if (availableInstances != null) {
return availableInstances;
}
// 如果全部未响应,使用默认算法
return getHostByRandomWeight(hosts);
}
public static Instance getAvailableInstances(List<Instance> hosts, DefaultRequest defaultRequest) {
RequestDataContext context = (RequestDataContext) defaultRequest.getContext();
RequestData clientRequest = context.getClientRequest();
HttpHeaders headers = clientRequest.getHeaders();
Instance instance = new Instance();
String rawUrl = clientRequest.getUrl().toString();
// 创建HttpClient
HttpClient httpClient = HttpClient.newBuilder()
.build();
// 构建HttpRequest
Map<String, String> headerMap = new HashMap<>();
headers.forEach((k, v) -> {
if (k.equals("Host")) {
return;
}
headerMap.put(k, v.get(0));
});
// 网关url转为服务url
String targetUrl = replaceServerUrl(rawUrl);
for (Instance host : hosts) {
try {
String hostName = host.getIp() + ":" + host.getPort();
headerMap.put("Host", hostName);
String url = targetUrl.formatted(hostName);
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(new URI(url));
headerMap.forEach(builder::header);
HttpRequest request = builder.GET()
.timeout(Duration.ofMillis(100))
.build();
// 发送请求,只需要保证是这个机器,不需要实际连接,以免还要写端口逻辑。
// 对应的服务会出现错误日志,received a frame that is not masked as expected。无需理会
httpClient.send(request, HttpResponse.BodyHandlers.ofString());
} catch (HttpConnectTimeoutException | InterruptedException | URISyntaxException e) {
System.err.println(e.getMessage());
} catch (IOException e) {
instance = host;
}
}
return instance;
}
/**
* 网关的URL转为服务的URL
* @param rawUrl 网关的URL
* @return 获取到的服务的URL
*/
private static String replaceServerUrl(String rawUrl) {
// 匹配IP地址和端口的正则表达式
String regex = "http://(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}):(\\d+)/.*";
// 创建Pattern对象
Pattern pattern = Pattern.compile(regex);
// 创建Matcher对象
Matcher matcher = pattern.matcher(rawUrl);
// 进行匹配和替换
if (matcher.matches()) {
String ipAddress = matcher.group(1);
String port = matcher.group(2);
String replacedString = rawUrl.replace(ipAddress + ":" + port, "%s");
// 输出替换后的字符串
System.out.println("Replaced String: " + replacedString);
return replacedString;
}
return "";
}
static {
// jdk 屏蔽了部分请求头,需要手动开启
System.setProperty("jdk.httpclient.allowRestrictedHeaders", "connection,content-length,expect,host,upgrade");
}
}
配置类注册
@Configuration
@LoadBalancerClients(defaultConfiguration = SpringBeanConfig.class)
public class SpringBeanConfig {
@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
@Bean
public ReactorLoadBalancer<ServiceInstance> nacosLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory,
NacosDiscoveryProperties nacosDiscoveryProperties) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new NacosSocketCustomLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name,
ServiceInstanceListSupplier.class),
name, nacosDiscoveryProperties);
}
}
socket-io注册到nacos文章:
https://www.cnblogs.com/xxsdnol/p/17903542.html