|
|
|
@ -50,6 +50,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
|
|
|
|
|
private static int lastActiveCount = -1;
|
|
|
|
|
private static int lastHashIndex = -1;
|
|
|
|
|
private static volatile boolean isQualified = false;
|
|
|
|
|
|
|
|
|
|
public HeartBeatManagementServiceImpl() {
|
|
|
|
|
this.heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
|
|
|
|
@ -58,17 +59,17 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException {
|
|
|
|
|
int hashIndex = -1;
|
|
|
|
|
ServerContext localServerCtx = null;
|
|
|
|
|
int hashIndex;
|
|
|
|
|
ServerContext localServerCtx;
|
|
|
|
|
ServerCtxInfo serverCtxInfo = null;
|
|
|
|
|
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
|
|
|
|
|
try {
|
|
|
|
|
HeartBeatBeaconDAOFactory.openConnection();
|
|
|
|
|
int timeOutIntervalInSeconds = HeartBeatBeaconConfig.getInstance().getServerTimeOutIntervalInSeconds();
|
|
|
|
|
int timeSkew = HeartBeatBeaconConfig.getInstance().getTimeSkew();
|
|
|
|
|
int cumilativeTimeOut = timeOutIntervalInSeconds + timeSkew;
|
|
|
|
|
int cumulativeTimeOut = timeOutIntervalInSeconds + timeSkew;
|
|
|
|
|
String localServerUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
|
|
|
|
|
Map<String, ServerContext> serverCtxMap = heartBeatDAO.getActiveServerDetails(cumilativeTimeOut);
|
|
|
|
|
Map<String, ServerContext> serverCtxMap = heartBeatDAO.getActiveServerDetails(cumulativeTimeOut);
|
|
|
|
|
if (!serverCtxMap.isEmpty()) {
|
|
|
|
|
localServerCtx = serverCtxMap.get(localServerUUID);
|
|
|
|
|
if (localServerCtx != null) {
|
|
|
|
@ -97,7 +98,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean isTaskPartitioningEnabled() throws HeartBeatManagementException {
|
|
|
|
|
boolean enabled = false;
|
|
|
|
|
boolean enabled;
|
|
|
|
|
if (HeartBeatBeaconConfig.getInstance() != null) {
|
|
|
|
|
enabled = HeartBeatBeaconConfig.getInstance().isEnabled();
|
|
|
|
|
} else {
|
|
|
|
@ -111,7 +112,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException {
|
|
|
|
|
String uuid = null;
|
|
|
|
|
String uuid;
|
|
|
|
|
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
|
|
|
|
|
try {
|
|
|
|
|
HeartBeatBeaconDAOFactory.beginTransaction();
|
|
|
|
@ -121,12 +122,13 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
HeartBeatBeaconDAOFactory.commitTransaction();
|
|
|
|
|
}
|
|
|
|
|
} catch (HeartBeatDAOException e) {
|
|
|
|
|
String msg = "Error Occured while retrieving server context.";
|
|
|
|
|
String msg = "Error Occurred while retrieving server context.";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatManagementException(msg, e);
|
|
|
|
|
} catch (TransactionManagementException e) {
|
|
|
|
|
HeartBeatBeaconDAOFactory.rollbackTransaction();
|
|
|
|
|
String msg = "Error occurred while updating server context. Issue in opening a connection to the underlying data source";
|
|
|
|
|
String msg = "Error occurred while updating server context. Issue in opening a connection to the " +
|
|
|
|
|
"underlying data source";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatManagementException(msg, e);
|
|
|
|
|
} finally {
|
|
|
|
@ -141,35 +143,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean isQualifiedToExecuteTask() throws HeartBeatManagementException {
|
|
|
|
|
boolean isQualified = false;
|
|
|
|
|
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
|
|
|
|
|
try {
|
|
|
|
|
String localServerUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
|
|
|
|
|
HeartBeatBeaconDAOFactory.openConnection();
|
|
|
|
|
ElectedCandidate candidate = heartBeatDAO.retrieveCandidate();
|
|
|
|
|
if (candidate != null && candidate.getServerUUID().equalsIgnoreCase(localServerUUID)) {
|
|
|
|
|
isQualified = true;
|
|
|
|
|
if (log.isDebugEnabled()) {
|
|
|
|
|
log.debug("Node : " + localServerUUID + " is qualified to execute randomly assigned task.");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (HeartBeatDAOException e) {
|
|
|
|
|
String msg = "Error occurred while checking if server is qualified to execute randomly designated task.";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatManagementException(msg, e);
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
String msg = "Error occurred while opening a connection to the underlying data source";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatManagementException(msg, e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOFactory.closeConnection();
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
String msg = "Heart Beat Configuration Disabled. Error occurred while checking if server is qualified to execute randomly designated task.";
|
|
|
|
|
log.error(msg);
|
|
|
|
|
throw new HeartBeatManagementException(msg);
|
|
|
|
|
}
|
|
|
|
|
public boolean isQualifiedToExecuteTask() {
|
|
|
|
|
return isQualified;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -229,7 +203,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
if (presentCandidate != null) {
|
|
|
|
|
//if candidate is older than stipulated elapsed-time, purge and re-elect
|
|
|
|
|
if (presentCandidate.getTimeOfElection().before(new Timestamp(System.currentTimeMillis()
|
|
|
|
|
- TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)))) {
|
|
|
|
|
- TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)))) {
|
|
|
|
|
heartBeatDAO.purgeCandidates();
|
|
|
|
|
electCandidate(servers);
|
|
|
|
|
}
|
|
|
|
@ -240,13 +214,24 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
}
|
|
|
|
|
HeartBeatBeaconDAOFactory.commitTransaction();
|
|
|
|
|
}
|
|
|
|
|
ElectedCandidate candidate = heartBeatDAO.retrieveCandidate();
|
|
|
|
|
String localServerUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
|
|
|
|
|
if (candidate != null && candidate.getServerUUID().equalsIgnoreCase(localServerUUID)) {
|
|
|
|
|
isQualified = true;
|
|
|
|
|
if (log.isDebugEnabled()) {
|
|
|
|
|
log.debug("Node : " + localServerUUID + " is qualified to execute randomly assigned task.");
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
isQualified = false;
|
|
|
|
|
}
|
|
|
|
|
} catch (HeartBeatDAOException e) {
|
|
|
|
|
String msg = "Error occurred while electing candidate for dynamic task execution.";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatManagementException(msg, e);
|
|
|
|
|
} catch (TransactionManagementException e) {
|
|
|
|
|
HeartBeatBeaconDAOFactory.rollbackTransaction();
|
|
|
|
|
String msg = "Error occurred while electing candidate for dynamic task execution. Issue in opening a connection to the underlying data source";
|
|
|
|
|
String msg = "Error occurred while electing candidate for dynamic task execution. " +
|
|
|
|
|
"Issue in opening a connection to the underlying data source";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatManagementException(msg, e);
|
|
|
|
|
} finally {
|
|
|
|
@ -258,6 +243,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
throw new HeartBeatManagementException(msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException {
|
|
|
|
|
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
|
|
|
|
@ -278,7 +264,8 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
lastHashIndex = serverContext.getIndex();
|
|
|
|
|
lastActiveCount = servers.size();
|
|
|
|
|
|
|
|
|
|
ClusterFormationChangedNotifierRepository repository = HeartBeatBeaconDataHolder.getInstance().getClusterFormationChangedNotifierRepository();
|
|
|
|
|
ClusterFormationChangedNotifierRepository repository = HeartBeatBeaconDataHolder.getInstance()
|
|
|
|
|
.getClusterFormationChangedNotifierRepository();
|
|
|
|
|
Map<String, ClusterFormationChangedNotifier> notifiers = repository.getNotifiers();
|
|
|
|
|
for (String type : notifiers.keySet()) {
|
|
|
|
|
ClusterFormationChangedNotifier notifier = notifiers.get(type);
|
|
|
|
@ -372,8 +359,8 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
HeartBeatBeaconDAOFactory.openConnection();
|
|
|
|
|
int timeOutIntervalInSeconds = HeartBeatBeaconConfig.getInstance().getServerTimeOutIntervalInSeconds();
|
|
|
|
|
int timeSkew = HeartBeatBeaconConfig.getInstance().getTimeSkew();
|
|
|
|
|
int cumilativeTimeOut = timeOutIntervalInSeconds + timeSkew;
|
|
|
|
|
Map<String, ServerContext> serverCtxMap = heartBeatDAO.getActiveServerDetails(cumilativeTimeOut);
|
|
|
|
|
int cumulativeTimeOut = timeOutIntervalInSeconds + timeSkew;
|
|
|
|
|
Map<String, ServerContext> serverCtxMap = heartBeatDAO.getActiveServerDetails(cumulativeTimeOut);
|
|
|
|
|
for (String uuid : serverCtxMap.keySet()) {
|
|
|
|
|
ServerContext serverContext = serverCtxMap.get(uuid);
|
|
|
|
|
activeServers.put(serverContext.getIndex(), serverContext);
|
|
|
|
@ -396,4 +383,5 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
|
|
|
|
}
|
|
|
|
|
return activeServers;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|