imrpoved heart beat to handle cluster formation changed

try-it-mail-fix
Amalka Subasinghe 1 year ago
parent 1d7700ab5c
commit 9ecdd486f8

@ -70,6 +70,7 @@
!io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.internal, !io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.internal,
io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.* io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.*
</Export-Package> </Export-Package>
<DynamicImport-Package>*</DynamicImport-Package>
</instructions> </instructions>
</configuration> </configuration>
</plugin> </plugin>

@ -28,6 +28,7 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBea
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.osgi.service.component.ComponentContext; import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.ndatasource.core.DataSourceService; import org.wso2.carbon.ndatasource.core.DataSourceService;
import java.util.List; import java.util.List;
@ -73,7 +74,9 @@ public class HeartBeatBeaconComponent {
clusterFormationChangedNotifierRepository); clusterFormationChangedNotifierRepository);
//Setting up executors to notify heart beat status */ //Setting up executors to notify heart beat status */
HeartBeatExecutor.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails()); HeartBeatExecutor heartBeatExecutor = new HeartBeatExecutor();
componentContext.getBundleContext().registerService(
ServerStartupObserver.class.getName(), heartBeatExecutor, null);
} }

@ -26,13 +26,16 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.dto.ServerContex
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.core.ServerStartupObserver;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class HeartBeatExecutor { public class HeartBeatExecutor implements ServerStartupObserver {
private static Log log = LogFactory.getLog(HeartBeatExecutor.class); private static Log log = LogFactory.getLog(HeartBeatExecutor.class);
private static final int DEFAULT__NOTIFIER_INTERVAL = 5; private static final int DEFAULT__NOTIFIER_INTERVAL = 5;
@ -43,6 +46,20 @@ public class HeartBeatExecutor {
CONFIG = HeartBeatBeaconConfig.getInstance(); CONFIG = HeartBeatBeaconConfig.getInstance();
} }
@Override
public void completingServerStartup() {
}
@Override
public void completedServerStartup() {
try {
setUpNotifiers(HeartBeatBeaconUtils.getServerDetails());
} catch (HeartBeatBeaconConfigurationException | UnknownHostException | SocketException e) {
throw new RuntimeException(e);
}
}
static void setUpNotifiers(ServerContext ctx) throws HeartBeatBeaconConfigurationException { static void setUpNotifiers(ServerContext ctx) throws HeartBeatBeaconConfigurationException {
ScheduledExecutorService executor = ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(); Executors.newSingleThreadScheduledExecutor();

@ -15,9 +15,8 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.entgra.device.mgt.core.device.mgt.core.push.notification.mgt; package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifier;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -41,7 +40,7 @@ public class ClusterFormationChangedNotifierRepository {
public void addNotifier(String className) { public void addNotifier(String className) {
try { try {
if (!StringUtils.isEmpty(className)) { if (!StringUtils.isEmpty(className)) {
Class<?> clz = ClassLoader.getSystemClassLoader()forName(className); Class<?> clz = Class.forName(className);
ClusterFormationChangedNotifier notifier = (ClusterFormationChangedNotifier) clz.newInstance(); ClusterFormationChangedNotifier notifier = (ClusterFormationChangedNotifier) clz.newInstance();
notifiers.put(notifier.getType(), notifier); notifiers.put(notifier.getType(), notifier);
} }

@ -235,6 +235,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
} }
} else { } else {
//first time execution, elect if not present //first time execution, elect if not present
heartBeatDAO.purgeCandidates();
electCandidate(servers); electCandidate(servers);
} }
HeartBeatBeaconDAOFactory.commitTransaction(); HeartBeatBeaconDAOFactory.commitTransaction();
@ -268,6 +269,10 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
String serverUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID(); String serverUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
ServerContext serverContext = servers.get(serverUUID); ServerContext serverContext = servers.get(serverUUID);
if (log.isDebugEnabled()) {
log.debug("HashIndex (previous, current) : " + lastHashIndex + ", " + serverContext.getIndex());
log.debug("ActiveServerCount (previous, current) : " + lastActiveCount + ", " + servers.size());
}
// cluster change can be identified, either by changing hash index or changing active server count // cluster change can be identified, either by changing hash index or changing active server count
if ((lastHashIndex != serverContext.getIndex()) || (lastActiveCount != servers.size())) { if ((lastHashIndex != serverContext.getIndex()) || (lastActiveCount != servers.size())) {
lastHashIndex = serverContext.getIndex(); lastHashIndex = serverContext.getIndex();
@ -280,6 +285,9 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
Runnable r = new Runnable() { Runnable r = new Runnable() {
@Override @Override
public void run() { public void run() {
if (log.isDebugEnabled()) {
log.debug("notify cluster formation changed : " + notifier.getType());
}
notifier.notifyClusterFormationChanged(lastHashIndex, lastActiveCount); notifier.notifyClusterFormationChanged(lastHashIndex, lastActiveCount);
} }
}; };

Loading…
Cancel
Save