网站首页 > 教程文章 正文
大纲
1.客户端如何发起服务注册 + 发送服务心跳
2.服务端如何处理客户端的服务注册请求
3.注册服务—如何实现高并发支撑上百万服务注册
4.内存注册表—如何处理注册表的高并发读写冲突
1.客户端如何发起服务注册 + 发送服务心跳
(1)Nacos客户端项目启动时为什么会自动注册服务
(2)Nacos客户端通过什么方式注册服务
(3)Nacos客户端如何发送服务心跳
(1)Nacos客户端项目启动时为什么会自动注册服务
Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。引入
spring-cloud-starter-alibaba-nacos-discovery后,才自动注册服务。查看这个依赖包中的spring.factories文件,发现有一些Configuration类。
Spring Boot启动时会扫描spring.factories文件,然后创建里面的配置类。
在spring.pactories文件中,与注册相关的类就是:
NacosServiceRegistryAutoConfiguration这个Nacos服务注册自动配置类。
Nacos服务注册自动配置类
NacosServiceRegistryAutoConfiguration如下,该配置类创建了三个Bean。
第一个Bean:NacosServiceRegistry
这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。
第二个Bean:NacosRegistration
这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。
第三个Bean:
NacosAutoServiceRegistration
这个Bean在创建时,会传入NacosServiceRegistry和NacosRegistration两个Bean。然后该Bean继承了
AbstractAutoServiceRegistration抽象类。该抽象类实现了ApplicationListener接口,所以项目启动时便是利用了Spring的监听事件来实现自动注册服务的。因为在Spring容器启动的最后会执行finishRefresh()方法,然后会发布一个事件,该事件会触发调用onApplicationEvent()方法。
调用
AbstractAutoServiceRegistration的onApplicationEvent()方法时,首先会调用
AbstractAutoServiceRegistration的bind()方法,然后调用
AbstractAutoServiceRegistration的start()方法,接着调用
AbstractAutoServiceRegistration的register()方法发起注册,也就是调用this.serviceRegistry的register()方法完成服务注册的具体工作。
其中,
AbstractAutoServiceRegistration的serviceRegistry属性,是在服务注册自动配置类
NacosServiceRegistryAutoConfiguration,创建第三个Bean—
NacosAutoServiceRegistration时,通过传入其创建的第一个Bean—NacosServiceRegistry进行赋值的。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
//传入NacosDiscoveryProperties作为参数
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
//传入NacosDiscoveryProperties作为参数
return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
//传入NacosServiceRegistry和NacosRegistration作为参数
return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
}
}
@ConfigurationProperties("spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties {
//nacos discovery server address.
private String serverAddr;
//the nacos authentication username.
private String username;
//the nacos authentication password.
private String password;
//namespace, separation registry of different environments.
private String namespace;
//service name to registry.
@Value("${spring.cloud.nacos.discovery.service:${spring.application.name:}}")
private String service;
//cluster name for nacos.
private String clusterName = "DEFAULT";
//group name for nacos.
private String group = "DEFAULT_GROUP";
//The ip address your want to register for your service instance, needn't to set it if the auto detect ip works well.
private String ip;
//The port your want to register for your service instance, needn't to set it if the auto detect port works well.
private int port = -1;
//Heart beat interval. Time unit: millisecond.
private Integer heartBeatInterval;
//Heart beat timeout. Time unit: millisecond.
private Integer heartBeatTimeout;
//If instance is ephemeral.The default value is true.
private boolean ephemeral = true;
...
}
public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {
...
private NacosRegistration registration;
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
...
}
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
...
private final ServiceRegistry<R> serviceRegistry;
private AutoServiceRegistrationProperties properties;
protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry, AutoServiceRegistrationProperties properties) {
this.serviceRegistry = serviceRegistry;
this.properties = properties;
}
...
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
//only initialize if nonSecurePort is greater than 0 and it isn't already running
//because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
//发起注册
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
protected void register() {
//调用创建NacosAutoServiceRegistration时传入的NacosServiceRegistry实例的register()方法
this.serviceRegistry.register(getRegistration());
}
...
}
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private final NacosDiscoveryProperties nacosDiscoveryProperties;
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
}
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//把当前的服务实例注册到Nacos中
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
} catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
//rethrow a RuntimeException if the registration is failed.
//issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
private NamingService namingService() {
return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
}
...
}
Nacos客户端项目启动时自动触发服务实例注册的流程总结:Spring监听器调用onApplicationEvent()方法 -> bind()方法 -> start()方法 -> register()方法,最后register()方法会调用serviceRegistry属性的register()方法进行注册。
整个流程具体来说就是:首先通过spring.factories文件,找到一个注册相关的Configuration配置类,这个配置类里面定义了三个Bean对象。创建第三个Bean对象时,需要第一个、第二个Bean对象作为参数传进去。第一个Bean对象里面就有真正进行服务注册的register()方法,并且第一个Bean对象会赋值给第三个Bean对象中的serviceRegistry属性。在第三个Bean对象的父类会实现Spring的监听器方法。所以在Spring容器启动时会发布监听事件,从而触发执行Nacos注册逻辑。
(2)Nacos客户端通过什么方式注册服务
项目启动时是通过NacosServiceRegistry的register()方法发起服务注册的,然后会调用NacosNamingService的registerInstance()方法注册服务实例,接着调用NamingProxy的registerService()方法组装参数发起服务注册请求,接着调用NamingProxy的reqApi()方法向Nacos服务端发起服务注册请求,也就是调用NamingProxy的callServer()方法向Nacos服务端发送注册请求。
在NamingProxy的callServer()方法中,首先会调用NacosRestTemplate的exchangeForm()方法发起HTTP请求,然后会调用this.requestClient()的execute()方法执行HTTP请求的发送,接着会调用DefaultHttpClientRequest的execute()方法处理请求的发送,也就是通过Apache的CloseableHttpClient组件来处理发送HTTP请求。
注意:NacosServiceRegistry是属于nacos-discovery包中的类,NacosNamingService是属于nacos-client包中的类。
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private final NacosDiscoveryProperties nacosDiscoveryProperties;
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
}
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
//服务名称
String serviceId = registration.getServiceId();
//服务分组
String group = nacosDiscoveryProperties.getGroup();
//服务实例,包含了IP、Port等信息
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//调用NacosNamingService.registerInstance()方法把当前的服务实例注册到Nacos中
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
} catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
rethrowRuntimeException(e);
}
}
private NamingService namingService() {
return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
}
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
return instance;
}
...
}
public class NacosNamingService implements NamingService {
private BeatReactor beatReactor;
private NamingProxy serverProxy;
...
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
//获取分组服务名字
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//判断要注册的服务实例是否是临时实例
if (instance.isEphemeral()) {
//如果是临时实例,则构建心跳信息
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加心跳信息
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//接下来调用NamingProxy的注册方法registerService()来注册服务实例
serverProxy.registerService(groupedServiceName, groupName, instance);
}
...
}
public class NamingProxy implements Closeable {
private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
...
//register a instance to service with specified instance properties.
//@param serviceName name of service
//@param groupName group of service
//@param instance instance to register
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance);
//创建一个Map组装注册请求参数
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
//下面UtilAndComs常量类拼装的请求url是: /Nacos/v1/ns/instance
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
return reqApi(api, params, Collections.EMPTY_MAP, method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {
return reqApi(api, params, body, getServerList(), method);
}
//Request api.
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg());
throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
//Call server.
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
//调用NacosRestTemplate.exchangeForm()方法发起HTTP请求
HttpRestResult<String> restResult = nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
...
}
public class NacosRestTemplate extends AbstractNacosRestTemplate {
private final HttpClientRequest requestClient;
...
//Execute the HTTP method to the given URI template, writing the given request entity to the request, and returns the response as {@link HttpRestResult}.
public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception {
RequestHttpEntity requestHttpEntity = new RequestHttpEntity(header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
return execute(url, httpMethod, requestHttpEntity, responseType);
}
private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
if (logger.isDebugEnabled()) {
logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
}
ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
HttpClientResponse response = null;
try {
response = this.requestClient().execute(uri, httpMethod, requestEntity);
return responseHandler.handle(response);
} finally {
if (response != null) {
response.close();
}
}
}
private HttpClientRequest requestClient() {
if (CollectionUtils.isNotEmpty(interceptors)) {
if (logger.isDebugEnabled()) {
logger.debug("Execute via interceptors :{}", interceptors);
}
return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());
}
return requestClient;
}
...
}
public class DefaultHttpClientRequest implements HttpClientRequest {
private final CloseableHttpClient client;
public DefaultHttpClientRequest(CloseableHttpClient client) {
this.client = client;
}
@Override
public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception {
HttpRequestBase request = build(uri, httpMethod, requestHttpEntity);
//通过Apache的CloseableHttpClient组件执行HTTP请求
CloseableHttpResponse response = client.execute(request);
return new DefaultClientHttpResponse(response);
}
...
}
由此可知:Nacos客户端是通过HTTP的方式往Nacos服务端发起服务注册的,Nacos服务端会提供服务注册的API接口给Nacos客户端进行HTTP调用,Nacos官方Open API文档中注册服务实例的接口说明如下:
(3)Nacos客户端如何发送服务心跳
调用NacosNamingService的registerInstance()方法注册服务实例时,在调用NamingProxy的registerService()方法来注册服务实例之前,会根据注册的服务实例是临时实例来构建和添加心跳信息到beatReactor,也就是调用BeatReactor的buildBeatInfo()方法和addBeatInfo()方法。
在BeatReactor的buildBeatInfo()方法中,会通过beatInfo的setPeriod()方法设置心跳间隔时间,默认是5秒。
在BeatReactor的addBeatInfo()方法中,倒数第二行会开启一个延时执行的任务,执行的任务是根据心跳信息BeatInfo封装的BeatTask。该BeatTask任务会交给BeatReactor的ScheduledExecutorService来执行,并通过beatInfo的getPeriod()方法获取延时执行的时间为5秒。
在BeatTask的run()方法中,就会调用NamingProxy的sendBeat()方法发送心跳请求给Nacos服务端,也就是调用NamingProxy的reqApi()方法向Nacos服务端发起心跳请求。如果返回的心跳响应表明服务实例不存在则重新发起服务实例注册请求。无论心跳响应如何,继续根据心跳信息BeatInfo封装一个BeatTask任务,然后将该任务交给线程池ScheduledExecutorService来延时5秒执行。
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private final NacosDiscoveryProperties nacosDiscoveryProperties;
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
}
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
//服务名称
String serviceId = registration.getServiceId();
//服务分组
String group = nacosDiscoveryProperties.getGroup();
//服务实例,包含了IP、Port等信息
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//调用NacosNamingService.registerInstance()方法把当前的服务实例注册到Nacos中
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
} catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
rethrowRuntimeException(e);
}
}
private NamingService namingService() {
return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
}
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
return instance;
}
...
}
public class NacosNamingService implements NamingService {
private BeatReactor beatReactor;
private NamingProxy serverProxy;
...
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
//获取分组服务名字
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//判定要注册的服务实例是否是临时实例
if (instance.isEphemeral()) {
//如果是临时实例,则构建心跳信息
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加心跳信息
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//接下来调用NamingProxy的注册方法registerService()来注册服务实例
serverProxy.registerService(groupedServiceName, groupName, instance);
}
...
}
public class BeatReactor implements Closeable {
...
//Build new beat information.
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
//getInstanceHeartBeatInterval()的返回值是5000
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
...
}
@JsonInclude(Include.NON_NULL)
public class Instance implements Serializable {
...
public long getInstanceHeartBeatInterval() {
//Constants.DEFAULT_HEART_BEAT_INTERVAL,默认是5000
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
...
}
public class BeatReactor implements Closeable {
private final ScheduledExecutorService executorService;
private final NamingProxy serverProxy;
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
public BeatReactor(NamingProxy serverProxy) {
this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
...
//Add beat information.
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
//开启一个延时执行的任务,执行的任务是BeatTask
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
...
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
//判断是否需要停止
if (beatInfo.isStopped()) {
return;
}
//获取下一次执行的时间,同样还是5s
long nextTime = beatInfo.getPeriod();
try {
//调用NamingProxy.sendBeat()方法发送心跳请求给Nacos服务端
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
//获取Nacos服务端返回的code状态码
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
//如果code = RESOURCE_NOT_FOUND,没有找到资源,那么表示之前注册的信息,已经被Nacos服务端移除了
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
//然后重新组装参数,重新发起注册请求
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
//调用NamingProxy.registerService()方法发送服务实例注册请求到Nacos服务端
serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
//把beatInfo又重新放入延迟任务当中,并且还是5秒,所以一直是个循环的状态
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
public class NamingProxy implements Closeable {
...
//Send beat.
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
...
}
由此可见,在客户端在发起服务注册期间,会开启一个心跳健康检查的延时任务,这个任务每间隔5s执行一次。任务内容就是通过HTTP请求调用发送Nacos提供的服务实例心跳接口。Nacos官方Open API文档中服务实例心跳接口说明如下:
如下是客户端发起服务注册 + 发送服务心跳的整个流程图:
猜你喜欢
- 2025-05-03 用Arduino mega2560及Ethernet扩展板做一个web简易服务器
- 2025-05-03 不买光盘 不用下载 Stadia:游戏要云起来
- 2025-05-03 linux命令 - fuser、lsof、pidof学习
- 2025-05-03 Linux与windows共享文件的神器:samba
- 2025-05-03 SpringBoot数据校验与优雅处理详解
- 2025-05-03 用活firewalld防火墙之service(防火墙运行命令)
- 2025-05-03 深度剖析 Spring Cloud Eureka 底层实现原理
- 2025-05-03 分享一下面试被问了负载均衡的这些问题
- 2025-05-03 一分钟了解SpringCloud中的ribbon到底是什么,原理是啥?
- 2025-05-03 内存最大,最省电NAS装机:34瓦128GB内存,14盘位NAS设置教程
- 最近发表
- 标签列表
-
- location.href (44)
- document.ready (36)
- git checkout -b (34)
- 跃点数 (35)
- 阿里云镜像地址 (33)
- qt qmessagebox (36)
- md5 sha1 (32)
- mybatis plus page (35)
- semaphore 使用详解 (32)
- update from 语句 (32)
- vue @scroll (38)
- 堆栈区别 (33)
- 在线子域名爆破 (32)
- 什么是容器 (33)
- sha1 md5 (33)
- navicat导出数据 (34)
- 阿里云acp考试 (33)
- 阿里云 nacos (34)
- redhat官网下载镜像 (36)
- srs服务器 (33)
- pico开发者 (33)
- https的端口号 (34)
- vscode更改主题 (35)
- 阿里云资源池 (34)
- os.path.join (33)