跳到主要内容

【Eureka】【12】EurekaClient启动后,增量获取注册信息

CacheRefreshThread
@Singleton
public class DiscoveryClient implements EurekaClient {



private final ScheduledExecutorService scheduler;

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {



try {


scheduler = Executors.newScheduledThreadPool(3,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());

cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff

} catch (Throwable e) {


throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}

initScheduledTasks();

}
private void initScheduledTasks() {


if (clientConfig.shouldFetchRegistry()) {


// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
}

class CacheRefreshThread implements Runnable {


public void run() {


refreshRegistry();
}
}

@VisibleForTesting
void refreshRegistry() {


try {


boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {


String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {


// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {


if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {


String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {


logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {


// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}

boolean success = fetchRegistry(remoteRegionsModified);
if (success) {


registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}

if (logger.isDebugEnabled()) {


StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {


allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes.toString());
}
} catch (Throwable e) {


logger.error("Cannot fetch registry from server", e);
}
}

/**
* Fetches the registry information.
*
* <p>
* This method tries to get only deltas after the first fetch unless there
* is an issue in reconciling eureka server and client registry information.
* </p>
*
* @param forceFullRegistryFetch Forces a full registry fetch.
*
* @return true if the registry was fetched
*/
private boolean fetchRegistry(boolean forceFullRegistryFetch) {


Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

try {


// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();

if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{


logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {


//如果不为空就进行增量获取
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {


logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {


if (tracer != null) {


tracer.stop();
}
}

// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();

// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();

// registry was fetched successfully, so return true
return true;
}

private void getAndUpdateDelta(Applications applications) throws Throwable {


long currentUpdateGeneration = fetchRegistryGeneration.get();

Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {


delta = httpResponse.getEntity();
}

if (delta == null) {


logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {


logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {


try {


updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {


fetchRegistryUpdateLock.unlock();
}
} else {


logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {


reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {


logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}

private void updateDelta(Applications delta) {


int deltaCount = 0;
for (Application app : delta.getRegisteredApplications()) {


for (InstanceInfo instance : app.getInstances()) {


Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {


Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
if (null == remoteApps) {


remoteApps = new Applications();
remoteRegionVsApps.put(instanceRegion, remoteApps);
}
applications = remoteApps;
}

++deltaCount;
if (ActionType.ADDED.equals(instance.getActionType())) {


Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {


applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) {


Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {


applications.addApplication(app);
}
logger.debug("Modified instance {} to the existing apps ", instance.getId());

applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

} else if (ActionType.DELETED.equals(instance.getActionType())) {


Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {


applications.addApplication(app);
}
logger.debug("Deleted instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
}
}
}
logger.debug(
"The total number of instances fetched by the delta processor : {}",
deltaCount);

getApplications().setVersion(delta.getVersion());
getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

for (Applications applications : remoteRegionVsApps.values()) {


applications.setVersion(delta.getVersion());
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}

}