diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/TestHeartBeatManagementService.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/TestHeartBeatManagementService.java index 0e5f71243c1..8d5924709ca 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/TestHeartBeatManagementService.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/TestHeartBeatManagementService.java @@ -66,4 +66,9 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic @Override public Map getActiveServers() throws HeartBeatManagementException { return null; } + + @Override + public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException { + + } } diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/config/HeartBeatBeaconConfig.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/config/HeartBeatBeaconConfig.java index e7df3d84680..ca3cc09610a 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/config/HeartBeatBeaconConfig.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/config/HeartBeatBeaconConfig.java @@ -22,6 +22,7 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconC import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconUtils; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.datasource.DataSourceConfig; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.InvalidConfigurationStateException; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifier; import org.w3c.dom.Document; import org.wso2.carbon.utils.CarbonUtils; @@ -29,8 +30,10 @@ import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; import javax.xml.bind.annotation.XmlRootElement; import java.io.File; +import java.util.List; @XmlRootElement(name = "HeartBeatBeaconConfig") public class HeartBeatBeaconConfig { @@ -50,6 +53,7 @@ public class HeartBeatBeaconConfig { private static final String SERVER_UUID_FILE_LOCATION = CarbonUtils.getCarbonConfigDirPath() + File.separator + "server-credentials.properties"; + private List notifiers; private HeartBeatBeaconConfig() { } @@ -135,4 +139,13 @@ public class HeartBeatBeaconConfig { } } + @XmlElementWrapper(name = "ClusterFormationChangedNotifiers", required = true) + @XmlElement(name = "Notifier", required = true) + public List getNotifiers() { + return notifiers; + } + + public void setNotifiers(List notifiers) { + this.notifiers = notifiers; + } } diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java index c7e874c3f5c..a526e216fb3 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java @@ -22,6 +22,7 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconU import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.datasource.DataSourceConfig; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifierRepository; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementServiceImpl; import org.apache.commons.logging.Log; @@ -29,6 +30,8 @@ import org.apache.commons.logging.LogFactory; import org.osgi.service.component.ComponentContext; import org.wso2.carbon.ndatasource.core.DataSourceService; +import java.util.List; + /** * @scr.component name="io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.heartbeatBeaconComponent" * immediate="true" @@ -58,10 +61,23 @@ public class HeartBeatBeaconComponent { DataSourceConfig dsConfig = HeartBeatBeaconConfig.getInstance().getDataSourceConfig(); HeartBeatBeaconDAOFactory.init(dsConfig); + ClusterFormationChangedNotifierRepository clusterFormationChangedNotifierRepository + = new ClusterFormationChangedNotifierRepository(); + List notifiers = HeartBeatBeaconConfig.getInstance().getNotifiers(); + if (notifiers != null && notifiers.size() > 0) { + for (String notifier : notifiers) { + clusterFormationChangedNotifierRepository.addNotifier(notifier); + } + } + HeartBeatBeaconDataHolder.getInstance().setClusterFormationChangedNotifierRepository( + clusterFormationChangedNotifierRepository); + //Setting up executors to notify heart beat status */ HeartBeatExecutor.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails()); } + + if (log.isDebugEnabled()) { log.debug("Heart Beat Notifier bundle has been successfully initialized"); } diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconDataHolder.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconDataHolder.java index bde95e33c47..a3119d6064a 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconDataHolder.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconDataHolder.java @@ -18,6 +18,7 @@ package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.internal; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifierRepository; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; public class HeartBeatBeaconDataHolder { @@ -27,6 +28,7 @@ public class HeartBeatBeaconDataHolder { private static HeartBeatBeaconDataHolder thisInstance = new HeartBeatBeaconDataHolder(); + private ClusterFormationChangedNotifierRepository clusterFormationChangedNotifierRepository; private HeartBeatBeaconDataHolder() {} public static HeartBeatBeaconDataHolder getInstance() { @@ -48,4 +50,12 @@ public class HeartBeatBeaconDataHolder { public void setLocalServerUUID(String localServerUUID) { this.localServerUUID = localServerUUID; } + + public ClusterFormationChangedNotifierRepository getClusterFormationChangedNotifierRepository() { + return clusterFormationChangedNotifierRepository; + } + + public void setClusterFormationChangedNotifierRepository(ClusterFormationChangedNotifierRepository clusterFormationChangedNotifierRepository) { + this.clusterFormationChangedNotifierRepository = clusterFormationChangedNotifierRepository; + } } diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java index 81be3a999f2..bd19a185a8d 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java @@ -69,6 +69,7 @@ public class HeartBeatExecutor { try { recordHeartBeat(designatedUUID); electDynamicTaskExecutionCandidate(cumilativeTimeOut); + notifyClusterFormationChanged(cumilativeTimeOut); } catch (Exception e) { log.error("Error while executing record heart beat task. This will result in schedule operation malfunction.", e); } @@ -98,5 +99,8 @@ public class HeartBeatExecutor { HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().electCandidate(cumilativeTimeOut); } + static void notifyClusterFormationChanged(int cumilativeTimeOut) throws HeartBeatManagementException { + HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().notifyClusterFormationChanged(cumilativeTimeOut); + } } diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifier.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifier.java new file mode 100644 index 00000000000..ecd53d98efa --- /dev/null +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifier.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2018 - 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. + * + * Entgra (Pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service; + +public interface ClusterFormationChangedNotifier { + + String getType(); + + void notifyClusterFormationChanged(int hashIndex, int activeServerCount); +} diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java new file mode 100644 index 00000000000..bb0c7726ded --- /dev/null +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2018 - 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. + * + * Entgra (Pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ClusterFormationChangedNotifierRepository { + + private Map notifiers; + private static final Log log = LogFactory.getLog(ClusterFormationChangedNotifierRepository.class); + + public ClusterFormationChangedNotifierRepository() { + this.notifiers = new ConcurrentHashMap<>(); + } + + public void addNotifier(ClusterFormationChangedNotifier notifier) { + notifiers.put(notifier.getType(), notifier); + } + + public void addNotifier(String className) { + try { + if (!StringUtils.isEmpty(className)) { + Class clz = Class.forName(className); + ClusterFormationChangedNotifier notifier = (ClusterFormationChangedNotifier) clz.newInstance(); + notifiers.put(notifier.getType(), notifier); + } + } catch (ClassNotFoundException e) { + log.error("Provided ClusterFormationChangedNotifier implementation '" + className + "' cannot be found", e); + } catch (InstantiationException e) { + log.error("Error occurred while instantiating ClusterFormationChangedNotifier implementation '" + + className + "'", e); + } catch (IllegalAccessException e) { + log.error("Error occurred while adding ClusterFormationChangedNotifier implementation '" + className + "'", e); + } + } + + public ClusterFormationChangedNotifier getNotifier(String type) { + return notifiers.get(type); + } + + public Map getNotifiers() { + return notifiers; + } +} diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java index 50a42d5d598..f8c6afcc568 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java @@ -36,6 +36,7 @@ public interface HeartBeatManagementService { boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException; void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException; + void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException; boolean updateTaskExecutionAcknowledgement(String newTask) throws HeartBeatManagementException; diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java index f39eda3c78f..7a31a6ebccb 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java @@ -48,6 +48,9 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic private final HeartBeatDAO heartBeatDAO; + private static int lastActiveCount = -1; + private static int lastHashIndex = -1; + public HeartBeatManagementServiceImpl() { this.heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO(); } @@ -254,6 +257,55 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic throw new HeartBeatManagementException(msg); } } + @Override + public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException { + if (HeartBeatBeaconConfig.getInstance().isEnabled()) { + try { + HeartBeatBeaconDAOFactory.beginTransaction(); + Map servers = heartBeatDAO.getActiveServerDetails(elapsedTimeInSeconds); + HeartBeatBeaconDAOFactory.commitTransaction(); + if (servers != null && !servers.isEmpty()) { + String serverUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID(); + ServerContext serverContext = servers.get(serverUUID); + + // cluster change can be identified, either by changing hash index or changing active server count + if ((lastHashIndex != serverContext.getIndex()) || (lastActiveCount != servers.size())) { + lastHashIndex = serverContext.getIndex(); + lastActiveCount = servers.size(); + + ClusterFormationChangedNotifierRepository repository = HeartBeatBeaconDataHolder.getInstance().getClusterFormationChangedNotifierRepository(); + Map notifiers = repository.getNotifiers(); + for (String type : notifiers.keySet()) { + ClusterFormationChangedNotifier notifier = notifiers.get(type); + Runnable r = new Runnable() { + @Override + public void run() { + notifier.notifyClusterFormationChanged(lastHashIndex, lastActiveCount); + } + }; + new Thread(r).start(); + } + } + } + } catch (HeartBeatDAOException e) { + String msg = "Error occurred while notifyClusterFormationChanged."; + log.error(msg, e); + throw new HeartBeatManagementException(msg, e); + } catch (TransactionManagementException e) { + HeartBeatBeaconDAOFactory.rollbackTransaction(); + String msg = "Error occurred while electing candidate for dynamic task execution. Issue in opening a connection to the underlying data source"; + log.error(msg, e); + throw new HeartBeatManagementException(msg, e); + } finally { + HeartBeatBeaconDAOFactory.closeConnection(); + } + } else { + String msg = "Heart Beat Configuration Disabled. Error while notifyClusterFormationChanged."; + log.error(msg); + throw new HeartBeatManagementException(msg); + } + } + private void electCandidate(Map servers) throws HeartBeatDAOException { String electedCandidate = getRandomElement(servers.keySet()); diff --git a/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/test/java/io/entgra/device/mgt/core/policy/mgt/core/mock/TestHeartBeatManagementService.java b/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/test/java/io/entgra/device/mgt/core/policy/mgt/core/mock/TestHeartBeatManagementService.java index 3272091675e..2254ea57eef 100644 --- a/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/test/java/io/entgra/device/mgt/core/policy/mgt/core/mock/TestHeartBeatManagementService.java +++ b/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/test/java/io/entgra/device/mgt/core/policy/mgt/core/mock/TestHeartBeatManagementService.java @@ -66,4 +66,9 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic @Override public Map getActiveServers() throws HeartBeatManagementException { return null; } + + @Override + public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException { + + } } diff --git a/features/heartbeat-management/io.entgra.device.mgt.core.server.heart.beat.feature/src/main/resources/conf/heart-beat-config.xml b/features/heartbeat-management/io.entgra.device.mgt.core.server.heart.beat.feature/src/main/resources/conf/heart-beat-config.xml index a434001e725..afdc9b39714 100644 --- a/features/heartbeat-management/io.entgra.device.mgt.core.server.heart.beat.feature/src/main/resources/conf/heart-beat-config.xml +++ b/features/heartbeat-management/io.entgra.device.mgt.core.server.heart.beat.feature/src/main/resources/conf/heart-beat-config.xml @@ -49,4 +49,7 @@ 300 5 600 + + + diff --git a/features/heartbeat-management/io.entgra.device.mgt.core.server.heart.beat.feature/src/main/resources/conf_templates/templates/repository/conf/heart-beat-config.xml.j2 b/features/heartbeat-management/io.entgra.device.mgt.core.server.heart.beat.feature/src/main/resources/conf_templates/templates/repository/conf/heart-beat-config.xml.j2 index f128b56c199..a294c876e36 100644 --- a/features/heartbeat-management/io.entgra.device.mgt.core.server.heart.beat.feature/src/main/resources/conf_templates/templates/repository/conf/heart-beat-config.xml.j2 +++ b/features/heartbeat-management/io.entgra.device.mgt.core.server.heart.beat.feature/src/main/resources/conf_templates/templates/repository/conf/heart-beat-config.xml.j2 @@ -51,11 +51,21 @@ {{heart_beat_beacon_conf.notifier_frequency_in_seconds}} {{heart_beat_beacon_conf.time_skew_in_seconds}} {{heart_beat_beacon_conf.sever_timeout_interval_in_seconds}} + {% if heart_beat_beacon_conf.cluster_formation_changed_configs.cluster_formation_changed_notifiers is defined %} + + {%- for cluster_formation_changed_notifier in heart_beat_beacon_conf.cluster_formation_changed_configs.cluster_formation_changed_notifiers -%} + {{cluster_formation_changed_notifier}} + {% endfor %} + + {% endif %} {% else %} false 30 300 5 600 + + + {% endif %}