Notify cluster formation changed implementation

task-fixes
commit b11c932814

@ -66,4 +66,9 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic
@Override public Map<Integer, ServerContext> getActiveServers() throws HeartBeatManagementException {
return null;
}
@Override
public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException {
}
}

@ -22,6 +22,7 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconC
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconUtils;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.datasource.DataSourceConfig;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.InvalidConfigurationStateException;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifier;
import org.w3c.dom.Document;
import org.wso2.carbon.utils.CarbonUtils;
@ -29,8 +30,10 @@ import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.File;
import java.util.List;
@XmlRootElement(name = "HeartBeatBeaconConfig")
public class HeartBeatBeaconConfig {
@ -50,6 +53,7 @@ public class HeartBeatBeaconConfig {
private static final String SERVER_UUID_FILE_LOCATION =
CarbonUtils.getCarbonConfigDirPath() + File.separator + "server-credentials.properties";
private List<String> notifiers;
private HeartBeatBeaconConfig() {
}
@ -135,4 +139,13 @@ public class HeartBeatBeaconConfig {
}
}
@XmlElementWrapper(name = "ClusterFormationChangedNotifiers", required = true)
@XmlElement(name = "Notifier", required = true)
public List<String> getNotifiers() {
return notifiers;
}
public void setNotifiers(List<String> notifiers) {
this.notifiers = notifiers;
}
}

@ -22,6 +22,7 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconU
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.datasource.DataSourceConfig;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifierRepository;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementServiceImpl;
import org.apache.commons.logging.Log;
@ -29,6 +30,8 @@ import org.apache.commons.logging.LogFactory;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.ndatasource.core.DataSourceService;
import java.util.List;
/**
* @scr.component name="io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.heartbeatBeaconComponent"
* immediate="true"
@ -58,10 +61,23 @@ public class HeartBeatBeaconComponent {
DataSourceConfig dsConfig = HeartBeatBeaconConfig.getInstance().getDataSourceConfig();
HeartBeatBeaconDAOFactory.init(dsConfig);
ClusterFormationChangedNotifierRepository clusterFormationChangedNotifierRepository
= new ClusterFormationChangedNotifierRepository();
List<String> notifiers = HeartBeatBeaconConfig.getInstance().getNotifiers();
if (notifiers != null && notifiers.size() > 0) {
for (String notifier : notifiers) {
clusterFormationChangedNotifierRepository.addNotifier(notifier);
}
}
HeartBeatBeaconDataHolder.getInstance().setClusterFormationChangedNotifierRepository(
clusterFormationChangedNotifierRepository);
//Setting up executors to notify heart beat status */
HeartBeatExecutor.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails());
}
if (log.isDebugEnabled()) {
log.debug("Heart Beat Notifier bundle has been successfully initialized");
}

@ -18,6 +18,7 @@
package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.internal;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifierRepository;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
public class HeartBeatBeaconDataHolder {
@ -27,6 +28,7 @@ public class HeartBeatBeaconDataHolder {
private static HeartBeatBeaconDataHolder thisInstance = new HeartBeatBeaconDataHolder();
private ClusterFormationChangedNotifierRepository clusterFormationChangedNotifierRepository;
private HeartBeatBeaconDataHolder() {}
public static HeartBeatBeaconDataHolder getInstance() {
@ -48,4 +50,12 @@ public class HeartBeatBeaconDataHolder {
public void setLocalServerUUID(String localServerUUID) {
this.localServerUUID = localServerUUID;
}
public ClusterFormationChangedNotifierRepository getClusterFormationChangedNotifierRepository() {
return clusterFormationChangedNotifierRepository;
}
public void setClusterFormationChangedNotifierRepository(ClusterFormationChangedNotifierRepository clusterFormationChangedNotifierRepository) {
this.clusterFormationChangedNotifierRepository = clusterFormationChangedNotifierRepository;
}
}

@ -69,6 +69,7 @@ public class HeartBeatExecutor {
try {
recordHeartBeat(designatedUUID);
electDynamicTaskExecutionCandidate(cumilativeTimeOut);
notifyClusterFormationChanged(cumilativeTimeOut);
} catch (Exception e) {
log.error("Error while executing record heart beat task. This will result in schedule operation malfunction.", e);
}
@ -98,5 +99,8 @@ public class HeartBeatExecutor {
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().electCandidate(cumilativeTimeOut);
}
static void notifyClusterFormationChanged(int cumilativeTimeOut) throws HeartBeatManagementException {
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().notifyClusterFormationChanged(cumilativeTimeOut);
}
}

@ -0,0 +1,25 @@
/*
* Copyright (c) 2018 - 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service;
public interface ClusterFormationChangedNotifier {
String getType();
void notifyClusterFormationChanged(int hashIndex, int activeServerCount);
}

@ -0,0 +1,64 @@
/*
* Copyright (c) 2018 - 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ClusterFormationChangedNotifierRepository {
private Map<String, ClusterFormationChangedNotifier> notifiers;
private static final Log log = LogFactory.getLog(ClusterFormationChangedNotifierRepository.class);
public ClusterFormationChangedNotifierRepository() {
this.notifiers = new ConcurrentHashMap<>();
}
public void addNotifier(ClusterFormationChangedNotifier notifier) {
notifiers.put(notifier.getType(), notifier);
}
public void addNotifier(String className) {
try {
if (!StringUtils.isEmpty(className)) {
Class<?> clz = Class.forName(className);
ClusterFormationChangedNotifier notifier = (ClusterFormationChangedNotifier) clz.newInstance();
notifiers.put(notifier.getType(), notifier);
}
} catch (ClassNotFoundException e) {
log.error("Provided ClusterFormationChangedNotifier implementation '" + className + "' cannot be found", e);
} catch (InstantiationException e) {
log.error("Error occurred while instantiating ClusterFormationChangedNotifier implementation '" +
className + "'", e);
} catch (IllegalAccessException e) {
log.error("Error occurred while adding ClusterFormationChangedNotifier implementation '" + className + "'", e);
}
}
public ClusterFormationChangedNotifier getNotifier(String type) {
return notifiers.get(type);
}
public Map<String, ClusterFormationChangedNotifier> getNotifiers() {
return notifiers;
}
}

@ -36,6 +36,7 @@ public interface HeartBeatManagementService {
boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException;
void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException;
void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException;
boolean updateTaskExecutionAcknowledgement(String newTask) throws HeartBeatManagementException;

@ -48,6 +48,9 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
private final HeartBeatDAO heartBeatDAO;
private static int lastActiveCount = -1;
private static int lastHashIndex = -1;
public HeartBeatManagementServiceImpl() {
this.heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
}
@ -254,6 +257,55 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
throw new HeartBeatManagementException(msg);
}
}
@Override
public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException {
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
HeartBeatBeaconDAOFactory.beginTransaction();
Map<String, ServerContext> servers = heartBeatDAO.getActiveServerDetails(elapsedTimeInSeconds);
HeartBeatBeaconDAOFactory.commitTransaction();
if (servers != null && !servers.isEmpty()) {
String serverUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
ServerContext serverContext = servers.get(serverUUID);
// cluster change can be identified, either by changing hash index or changing active server count
if ((lastHashIndex != serverContext.getIndex()) || (lastActiveCount != servers.size())) {
lastHashIndex = serverContext.getIndex();
lastActiveCount = servers.size();
ClusterFormationChangedNotifierRepository repository = HeartBeatBeaconDataHolder.getInstance().getClusterFormationChangedNotifierRepository();
Map<String, ClusterFormationChangedNotifier> notifiers = repository.getNotifiers();
for (String type : notifiers.keySet()) {
ClusterFormationChangedNotifier notifier = notifiers.get(type);
Runnable r = new Runnable() {
@Override
public void run() {
notifier.notifyClusterFormationChanged(lastHashIndex, lastActiveCount);
}
};
new Thread(r).start();
}
}
}
} catch (HeartBeatDAOException e) {
String msg = "Error occurred while notifyClusterFormationChanged.";
log.error(msg, e);
throw new HeartBeatManagementException(msg, e);
} catch (TransactionManagementException e) {
HeartBeatBeaconDAOFactory.rollbackTransaction();
String msg = "Error occurred while electing candidate for dynamic task execution. Issue in opening a connection to the underlying data source";
log.error(msg, e);
throw new HeartBeatManagementException(msg, e);
} finally {
HeartBeatBeaconDAOFactory.closeConnection();
}
} else {
String msg = "Heart Beat Configuration Disabled. Error while notifyClusterFormationChanged.";
log.error(msg);
throw new HeartBeatManagementException(msg);
}
}
private void electCandidate(Map<String, ServerContext> servers) throws HeartBeatDAOException {
String electedCandidate = getRandomElement(servers.keySet());

@ -66,4 +66,9 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic
@Override public Map<Integer, ServerContext> getActiveServers() throws HeartBeatManagementException {
return null;
}
@Override
public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException {
}
}

@ -49,4 +49,7 @@
<NotifierFrequencyInSeconds>300</NotifierFrequencyInSeconds>
<TimeSkewInSeconds>5</TimeSkewInSeconds>
<ServerTimeOutIntervalInSeconds>600</ServerTimeOutIntervalInSeconds>
<ClusterFormationChangedNotifiers>
<Notifier></Notifier>
</ClusterFormationChangedNotifiers>
</HeartBeatBeaconConfig>

@ -51,11 +51,21 @@
<NotifierFrequencyInSeconds>{{heart_beat_beacon_conf.notifier_frequency_in_seconds}}</NotifierFrequencyInSeconds>
<TimeSkewInSeconds>{{heart_beat_beacon_conf.time_skew_in_seconds}}</TimeSkewInSeconds>
<ServerTimeOutIntervalInSeconds>{{heart_beat_beacon_conf.sever_timeout_interval_in_seconds}}</ServerTimeOutIntervalInSeconds>
{% if heart_beat_beacon_conf.cluster_formation_changed_configs.cluster_formation_changed_notifiers is defined %}
<ClusterFormationChangedNotifiers>
{%- for cluster_formation_changed_notifier in heart_beat_beacon_conf.cluster_formation_changed_configs.cluster_formation_changed_notifiers -%}
<Notifier>{{cluster_formation_changed_notifier}}</Notifier>
{% endfor %}
</ClusterFormationChangedNotifiers>
{% endif %}
{% else %}
<Enable>false</Enable>
<NotifierInitialDelayInSeconds>30</NotifierInitialDelayInSeconds>
<NotifierFrequencyInSeconds>300</NotifierFrequencyInSeconds>
<TimeSkewInSeconds>5</TimeSkewInSeconds>
<ServerTimeOutIntervalInSeconds>600</ServerTimeOutIntervalInSeconds>
<ClusterFormationChangedNotifiers>
<Notifier></Notifier>
</ClusterFormationChangedNotifiers>
{% endif %}
</HeartBeatBeaconConfig>

Loading…
Cancel
Save