From cf3d50edf92879143fe185a0facd9c4782fb289f Mon Sep 17 00:00:00 2001 From: "amalka.subasinghe" Date: Thu, 24 Aug 2023 16:28:28 +0530 Subject: [PATCH 1/5] added new method getAllocatedOperationMappingsByStatus --- .../core/operation/mgt/dao/OperationDAO.java | 4 ++ .../mgt/dao/impl/GenericOperationDAOImpl.java | 48 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/mgt/dao/OperationDAO.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/mgt/dao/OperationDAO.java index 9d3de1494e..2b5658d85f 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/mgt/dao/OperationDAO.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/mgt/dao/OperationDAO.java @@ -105,6 +105,10 @@ public interface OperationDAO { Map> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus, int limit) throws OperationManagementDAOException; + Map> getAllocatedOperationMappingsByStatus(Operation.Status opStatus, + Operation.PushNotificationStatus pushNotificationStatus, int limit, int activeServerCount, int serverIndex) + throws OperationManagementDAOException; + List getActivities(List deviceTypes, String operationCode, long updatedSince, String operationStatus) throws OperationManagementDAOException; diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java index 046556459a..a864da7b45 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java @@ -1970,6 +1970,54 @@ public class GenericOperationDAOImpl implements OperationDAO { return operationMappingsTenantMap; } + @Override + public Map> getAllocatedOperationMappingsByStatus(Operation.Status opStatus, + Operation.PushNotificationStatus pushNotificationStatus, int limit, int activeServerCount, int serverIndex) + throws OperationManagementDAOException { + PreparedStatement stmt = null; + ResultSet rs = null; + Connection conn; + OperationMapping operationMapping; + Map> operationMappingsTenantMap = new HashMap<>(); + try { + conn = OperationManagementDAOFactory.getConnection(); + String sql = "SELECT op.ENROLMENT_ID, op.OPERATION_ID, d.DEVICE_IDENTIFICATION, dt.NAME as DEVICE_TYPE, " + + "d.TENANT_ID FROM DM_DEVICE d, DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = ?" + + " AND op.PUSH_NOTIFICATION_STATUS = ? AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID AND MOD(d.ID, ?) = ? ORDER" + + " BY op.OPERATION_ID LIMIT ?"; + stmt = conn.prepareStatement(sql); + stmt.setString(1, opStatus.toString()); + stmt.setString(2, pushNotificationStatus.toString()); + stmt.setInt(3, activeServerCount); + stmt.setInt(4, serverIndex); + stmt.setInt(5, limit); + rs = stmt.executeQuery(); + while (rs.next()) { + int tenantID = rs.getInt("TENANT_ID"); + List operationMappings = operationMappingsTenantMap.get(tenantID); + if (operationMappings == null) { + operationMappings = new LinkedList<>(); + operationMappingsTenantMap.put(tenantID, operationMappings); + } + operationMapping = new OperationMapping(); + operationMapping.setOperationId(rs.getInt("OPERATION_ID")); + DeviceIdentifier deviceIdentifier = new DeviceIdentifier(); + deviceIdentifier.setId(rs.getString("DEVICE_IDENTIFICATION")); + deviceIdentifier.setType(rs.getString("DEVICE_TYPE")); + operationMapping.setDeviceIdentifier(deviceIdentifier); + operationMapping.setEnrollmentId(rs.getInt("ENROLMENT_ID")); + operationMapping.setTenantId(tenantID); + operationMappings.add(operationMapping); + } + } catch (SQLException e) { + throw new OperationManagementDAOException("SQL error while getting operation mappings from database. " + + e.getMessage(), e); + } finally { + OperationManagementDAOUtil.cleanupResources(stmt, rs); + } + return operationMappingsTenantMap; + } + public List getActivities(List deviceTypes, String operationCode, long updatedSince, String operationStatus) throws OperationManagementDAOException { From 349fce4e6321c2651456e87a11d1c8c4e3bc1fe6 Mon Sep 17 00:00:00 2001 From: "amalka.subasinghe" Date: Thu, 24 Aug 2023 16:29:22 +0530 Subject: [PATCH 2/5] improved push notofication task to get allocated operations when heart beat enabled --- .../task/PushNotificationSchedulerTask.java | 27 ++++++++++++++++--- .../PushNotificationSchedulerTaskTest.java | 20 +++++++++----- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java index 2e859bcead..c4c9df0e56 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java @@ -17,6 +17,9 @@ */ package io.entgra.device.mgt.core.device.mgt.core.push.notification.mgt.task; +import io.entgra.device.mgt.core.device.mgt.common.ServerCtxInfo; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.context.PrivilegedCarbonContext; @@ -63,9 +66,27 @@ public class PushNotificationSchedulerTask implements Runnable { try { //Get next available operation list per device batch OperationManagementDAOFactory.openConnection(); - operationMappingsTenantMap = operationDAO.getOperationMappingsByStatus(Operation.Status - .PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance() - .getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize()); + try { + if (DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled()) { + ServerCtxInfo serverCtxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); + if (serverCtxInfo != null) { + operationMappingsTenantMap = operationDAO.getAllocatedOperationMappingsByStatus(Operation.Status + .PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance() + .getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize(), + serverCtxInfo.getActiveServerCount(), serverCtxInfo.getLocalServerHashIdx()); + } else { + if (log.isDebugEnabled()) { + log.debug("Active server information not recorded yet."); + } + } + } else { + operationMappingsTenantMap = operationDAO.getOperationMappingsByStatus(Operation.Status + .PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance() + .getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize()); + } + } catch (HeartBeatManagementException e) { + throw new RuntimeException(e); + } } catch (OperationManagementDAOException e) { log.error("Unable to retrieve scheduled pending operations for task.", e); } finally { diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTaskTest.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTaskTest.java index 7d92b9faa1..df04c2fb88 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTaskTest.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTaskTest.java @@ -17,12 +17,6 @@ */ package io.entgra.device.mgt.core.device.mgt.core.push.notification.mgt.task; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException; import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.OperationManagementException; import io.entgra.device.mgt.core.device.mgt.core.common.BaseDeviceManagementTest; @@ -37,6 +31,14 @@ import io.entgra.device.mgt.core.device.mgt.core.operation.mgt.dao.OperationMana import io.entgra.device.mgt.core.device.mgt.core.operation.mgt.dao.OperationManagementDAOFactory; import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderService; import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderServiceImpl; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.sql.SQLException; @@ -52,13 +54,16 @@ public class PushNotificationSchedulerTaskTest extends BaseDeviceManagementTest private PushNotificationSchedulerTask pushNotificationSchedulerTask; private OperationDAO operationDAO; + private HeartBeatManagementService heartBeatManagementService; @BeforeClass public void init() throws DeviceManagementException, RegistryException { DeviceConfigurationManager.getInstance().initConfig(); log.info("Initializing Push Notification Scheduler Test Class"); DeviceManagementServiceComponent.notifyStartupListeners(); this.deviceMgtProviderService = Mockito.mock(DeviceManagementProviderServiceImpl.class, Mockito.CALLS_REAL_METHODS); + this.heartBeatManagementService = Mockito.mock(HeartBeatManagementService.class, Mockito.CALLS_REAL_METHODS); DeviceManagementDataHolder.getInstance().setDeviceManagementProvider(this.deviceMgtProviderService); + DeviceManagementDataHolder.getInstance().setHeartBeatService(this.heartBeatManagementService); this.operationDAO = OperationManagementDAOFactory.getOperationDAO(); this.pushNotificationSchedulerTask = new PushNotificationSchedulerTask(); } @@ -69,6 +74,7 @@ public class PushNotificationSchedulerTaskTest extends BaseDeviceManagementTest OperationManagementDAOException { try { log.info("Attempting to execute push notification task scheduler"); + Mockito.when(this.heartBeatManagementService.isTaskPartitioningEnabled()).thenReturn(false); Mockito.doReturn(new TestNotificationStrategy()).when(this.deviceMgtProviderService) .getNotificationStrategyByDeviceType(Mockito.anyString()); Mockito.doReturn(new io.entgra.device.mgt.core.device.mgt.common.operation.mgt.Operation()) @@ -81,6 +87,8 @@ public class PushNotificationSchedulerTaskTest extends BaseDeviceManagementTest .getPushNotificationConfiguration().getSchedulerBatchSize()); Assert.assertEquals(operationMappingsTenantMap.size(), 0); log.info("Push notification task execution complete."); + } catch (HeartBeatManagementException e) { + throw new RuntimeException(e); } finally { OperationManagementDAOFactory.closeConnection(); } From 21606bc5cd0468d60b1358d0832f3d39ec0cb95d Mon Sep 17 00:00:00 2001 From: "amalka.subasinghe" Date: Thu, 24 Aug 2023 16:30:24 +0530 Subject: [PATCH 3/5] operation timeout task updated to execute only in elected node --- .../task/impl/OperationTimeoutTask.java | 85 +++++++++++-------- ...perationTimeoutTaskManagerServiceImpl.java | 6 +- .../repository/conf/cdm-config.xml.j2 | 2 +- 3 files changed, 54 insertions(+), 39 deletions(-) diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java index 3d336140e3..157795aa5a 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java @@ -18,6 +18,7 @@ package io.entgra.device.mgt.core.device.mgt.core.operation.timeout.task.impl; import com.google.gson.Gson; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException; @@ -36,7 +37,6 @@ import java.util.List; public class OperationTimeoutTask extends DynamicPartitionedScheduleTask { private static final Log log = LogFactory.getLog(OperationTimeoutTask.class); - @Override protected void setup() { @@ -44,45 +44,60 @@ public class OperationTimeoutTask extends DynamicPartitionedScheduleTask { @Override protected void executeDynamicTask() { - String operationTimeoutTaskConfigStr = getProperty( - OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG); - Gson gson = new Gson(); - OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class); - try { - long timeMillis = System.currentTimeMillis() - operationTimeoutConfig.getTimeout() * 60 * 1000; - List deviceTypes = new ArrayList<>(); - if (operationTimeoutConfig.getDeviceTypes().size() == 1 && - "ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) { - try { - List deviceTypeList = DeviceManagementDataHolder.getInstance() - .getDeviceManagementProvider().getDeviceTypes(); - for (DeviceType deviceType : deviceTypeList) { - deviceTypes.add(deviceType.getName()); + if (isQualifiedToExecuteTask()) { // this task will run only in one node when the deployment has multiple nodes + String operationTimeoutTaskConfigStr = getProperty( + OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG); + Gson gson = new Gson(); + OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class); + try { + long timeMillis = System.currentTimeMillis() - (long) operationTimeoutConfig.getTimeout(); + List deviceTypes = new ArrayList<>(); + if (operationTimeoutConfig.getDeviceTypes().size() == 1 && + "ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) { + try { + List deviceTypeList = DeviceManagementDataHolder.getInstance() + .getDeviceManagementProvider().getDeviceTypes(); + for (DeviceType deviceType : deviceTypeList) { + deviceTypes.add(deviceType.getName()); + } + } catch (DeviceManagementException e) { + log.error("Error occurred while reading device types", e); } - } catch (DeviceManagementException e) { - log.error("Error occurred while reading device types", e); + } else { + deviceTypes = operationTimeoutConfig.getDeviceTypes(); } - } else { - deviceTypes = operationTimeoutConfig.getDeviceTypes(); - } - List activities = DeviceManagementDataHolder.getInstance().getOperationManager() - .getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis, - operationTimeoutConfig.getInitialStatus()); - for (Activity activity : activities) { - for (ActivityStatus activityStatus : activity.getActivityStatus()) { - String operationId = activity.getActivityId().replace("ACTIVITY_", ""); - Operation operation = DeviceManagementDataHolder.getInstance().getOperationManager() - .getOperation(Integer.parseInt(operationId)); - operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus())); - DeviceManagementDataHolder.getInstance().getOperationManager() - .updateOperation(activityStatus.getDeviceIdentifier(), operation); + List activities = DeviceManagementDataHolder.getInstance().getOperationManager() + .getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis, + operationTimeoutConfig.getInitialStatus()); + for (Activity activity : activities) { + for (ActivityStatus activityStatus : activity.getActivityStatus()) { + String operationId = activity.getActivityId().replace("ACTIVITY_", ""); + Operation operation = DeviceManagementDataHolder.getInstance().getOperationManager() + .getOperation(Integer.parseInt(operationId)); + operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus())); + DeviceManagementDataHolder.getInstance().getOperationManager() + .updateOperation(activityStatus.getDeviceIdentifier(), operation); + } } - } - } catch (OperationManagementException e) { - String msg = "Error occurred while retrieving operations."; - log.error(msg, e); + } catch (OperationManagementException e) { + String msg = "Error occurred while retrieving operations."; + log.error(msg, e); + } } + } + private boolean isQualifiedToExecuteTask() { + if (isDynamicTaskEligible()) { + try { + return DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask(); + } catch (HeartBeatManagementException e) { + log.error("Error while checking is qualified to execute task", e); + } + } else { + return true; + } + return false; + } } diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTaskManagerServiceImpl.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTaskManagerServiceImpl.java index cffc4937b3..e18507df3e 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTaskManagerServiceImpl.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTaskManagerServiceImpl.java @@ -57,14 +57,14 @@ public class OperationTimeoutTaskManagerServiceImpl implements OperationTimeoutT log.debug("Operation timeout task is started for the device type(s) : " + config.getDeviceTypes() + ", operation code : " + config.getInitialStatus()); log.debug( - "Operation timeout task is at frequency of : " + config.getTimeout() + " minutes"); + "Operation timeout task is at frequency of : " + config.getTimeout() + " milliseconds"); } TaskManager taskManager = taskService.getTaskManager(OPERATION_TIMEOUT_TASK); TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); //Convert to milli seconds - triggerInfo.setIntervalMillis(config.getTimeout() * 60 * 1000); + triggerInfo.setIntervalMillis(config.getTimeout()); triggerInfo.setRepeatCount(-1); Gson gson = new Gson(); @@ -125,7 +125,7 @@ public class OperationTimeoutTaskManagerServiceImpl implements OperationTimeoutT if (taskManager.isTaskScheduled(taskName)) { taskManager.deleteTask(taskName); TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); - triggerInfo.setIntervalMillis(config.getTimeout() * 60 * 1000); + triggerInfo.setIntervalMillis(config.getTimeout()); triggerInfo.setRepeatCount(-1); Map properties = new HashMap<>(); diff --git a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/conf_templates/templates/repository/conf/cdm-config.xml.j2 b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/conf_templates/templates/repository/conf/cdm-config.xml.j2 index 2152b1814f..2fe1494e09 100644 --- a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/conf_templates/templates/repository/conf/cdm-config.xml.j2 +++ b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/conf_templates/templates/repository/conf/cdm-config.xml.j2 @@ -353,7 +353,7 @@ - + {% if device_mgt_conf.operation_timeout_conf is defined %} From 1d7700ab5c8dd0bd318e6ac7afa16a12f728a7a0 Mon Sep 17 00:00:00 2001 From: "amalka.subasinghe" Date: Thu, 24 Aug 2023 16:31:17 +0530 Subject: [PATCH 4/5] imrpoved heart beat to handle cluster formation changed --- .../service/ClusterFormationChangedNotifierRepository.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java index bb0c7726de..9c711366ae 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java @@ -15,8 +15,9 @@ * specific language governing permissions and limitations * under the License. */ -package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service; +package io.entgra.device.mgt.core.device.mgt.core.push.notification.mgt; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifier; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,7 +41,7 @@ public class ClusterFormationChangedNotifierRepository { public void addNotifier(String className) { try { if (!StringUtils.isEmpty(className)) { - Class clz = Class.forName(className); + Class clz = ClassLoader.getSystemClassLoader()forName(className); ClusterFormationChangedNotifier notifier = (ClusterFormationChangedNotifier) clz.newInstance(); notifiers.put(notifier.getType(), notifier); } From 9ecdd486f860ea154f664e6e159bb6e3ac7b490b Mon Sep 17 00:00:00 2001 From: "amalka.subasinghe" Date: Thu, 24 Aug 2023 16:31:42 +0530 Subject: [PATCH 5/5] imrpoved heart beat to handle cluster formation changed --- .../pom.xml | 1 + .../internal/HeartBeatBeaconComponent.java | 5 ++++- .../beacon/internal/HeartBeatExecutor.java | 19 ++++++++++++++++++- ...terFormationChangedNotifierRepository.java | 5 ++--- .../HeartBeatManagementServiceImpl.java | 8 ++++++++ 5 files changed, 33 insertions(+), 5 deletions(-) diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/pom.xml b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/pom.xml index 83e9788b9d..6cdcf4027f 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/pom.xml +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/pom.xml @@ -70,6 +70,7 @@ !io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.internal, io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.* + * diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java index a526e216fb..356f5d4996 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java @@ -28,6 +28,7 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBea import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.osgi.service.component.ComponentContext; +import org.wso2.carbon.core.ServerStartupObserver; import org.wso2.carbon.ndatasource.core.DataSourceService; import java.util.List; @@ -73,7 +74,9 @@ public class HeartBeatBeaconComponent { clusterFormationChangedNotifierRepository); //Setting up executors to notify heart beat status */ - HeartBeatExecutor.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails()); + HeartBeatExecutor heartBeatExecutor = new HeartBeatExecutor(); + componentContext.getBundleContext().registerService( + ServerStartupObserver.class.getName(), heartBeatExecutor, null); } diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java index bd19a185a8..bcadf4afe7 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java @@ -26,13 +26,16 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.dto.ServerContex import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.core.ServerStartupObserver; import java.io.IOException; +import java.net.SocketException; +import java.net.UnknownHostException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -public class HeartBeatExecutor { +public class HeartBeatExecutor implements ServerStartupObserver { private static Log log = LogFactory.getLog(HeartBeatExecutor.class); private static final int DEFAULT__NOTIFIER_INTERVAL = 5; @@ -43,6 +46,20 @@ public class HeartBeatExecutor { CONFIG = HeartBeatBeaconConfig.getInstance(); } + @Override + public void completingServerStartup() { + + } + + @Override + public void completedServerStartup() { + try { + setUpNotifiers(HeartBeatBeaconUtils.getServerDetails()); + } catch (HeartBeatBeaconConfigurationException | UnknownHostException | SocketException e) { + throw new RuntimeException(e); + } + } + static void setUpNotifiers(ServerContext ctx) throws HeartBeatBeaconConfigurationException { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java index 9c711366ae..bb0c7726de 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/ClusterFormationChangedNotifierRepository.java @@ -15,9 +15,8 @@ * specific language governing permissions and limitations * under the License. */ -package io.entgra.device.mgt.core.device.mgt.core.push.notification.mgt; +package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service; -import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifier; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,7 +40,7 @@ public class ClusterFormationChangedNotifierRepository { public void addNotifier(String className) { try { if (!StringUtils.isEmpty(className)) { - Class clz = ClassLoader.getSystemClassLoader()forName(className); + Class clz = Class.forName(className); ClusterFormationChangedNotifier notifier = (ClusterFormationChangedNotifier) clz.newInstance(); notifiers.put(notifier.getType(), notifier); } diff --git a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java index 7a31a6ebcc..6c737c0b2f 100644 --- a/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java +++ b/components/heartbeat-management/io.entgra.device.mgt.core.server.bootup.heartbeat.beacon/src/main/java/io/entgra/device/mgt/core/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java @@ -235,6 +235,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic } } else { //first time execution, elect if not present + heartBeatDAO.purgeCandidates(); electCandidate(servers); } HeartBeatBeaconDAOFactory.commitTransaction(); @@ -268,6 +269,10 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic String serverUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID(); ServerContext serverContext = servers.get(serverUUID); + if (log.isDebugEnabled()) { + log.debug("HashIndex (previous, current) : " + lastHashIndex + ", " + serverContext.getIndex()); + log.debug("ActiveServerCount (previous, current) : " + lastActiveCount + ", " + servers.size()); + } // cluster change can be identified, either by changing hash index or changing active server count if ((lastHashIndex != serverContext.getIndex()) || (lastActiveCount != servers.size())) { lastHashIndex = serverContext.getIndex(); @@ -280,6 +285,9 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic Runnable r = new Runnable() { @Override public void run() { + if (log.isDebugEnabled()) { + log.debug("notify cluster formation changed : " + notifier.getType()); + } notifier.notifyClusterFormationChanged(lastHashIndex, lastActiveCount); } };