From 9d39197844cee6ef673c44fa44794b1f337a6718 Mon Sep 17 00:00:00 2001 From: Ace Date: Thu, 26 Nov 2020 19:16:49 +0530 Subject: [PATCH] Adding improvements to dynamic task allocation --- .../core/dao/impl/AbstractDeviceDAOImpl.java | 4 +- .../operation/mgt/OperationManagerImpl.java | 3 + .../mgt/dao/OperationMappingDAO.java | 33 +++++++++ .../mgt/dao/impl/OperationMappingDAOImpl.java | 74 +++++++++++++++++++ .../task/impl/DeviceStatusMonitoringTask.java | 47 ++++++++---- .../task/impl/DeviceDetailsRetrieverTask.java | 1 + .../impl/DynamicPartitionedScheduleTask.java | 22 +++++- .../core/TestHeartBeatManagementService.java | 24 ++++++ .../mgt/core/task/DeviceTaskManagerTest.java | 5 ++ .../beacon/HeartBeatBeaconUtils.java | 34 +++++++++ .../beacon/config/HeartBeatBeaconConfig.java | 7 ++ .../heartbeat/beacon/dao/HeartBeatDAO.java | 3 +- .../dao/impl/GenericHeartBeatDAOImpl.java | 34 ++++++++- .../internal/HeartBeatBeaconComponent.java | 4 +- ...ernalUtils.java => HeartBeatExecutor.java} | 22 ++++-- .../service/HeartBeatManagementService.java | 2 +- .../HeartBeatManagementServiceImpl.java | 26 ++++--- .../mgt/common/PolicyAdministratorPoint.java | 1 + .../mgt/core/cache/PolicyCacheManager.java | 1 + .../cache/impl/PolicyCacheManagerImpl.java | 1 + .../mgt/core/dao/impl/ProfileDAOImpl.java | 1 + .../mgt/core/enforcement/DelegationTask.java | 27 ++++--- .../impl/PolicyAdministratorPointImpl.java | 1 + .../policy/mgt/core/mgt/PolicyManager.java | 1 + .../mgt/core/mgt/impl/PolicyManagerImpl.java | 1 + .../mgt/core/mgt/impl/ProfileManagerImpl.java | 1 + .../policy/mgt/core/task/MonitoringTask.java | 5 +- .../core/PolicyManagerServiceImplTest.java | 6 +- .../mock/TestHeartBeatManagementService.java | 24 ++++++ 29 files changed, 361 insertions(+), 54 deletions(-) create mode 100644 components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/TestHeartBeatManagementService.java rename components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/{HeartBeatInternalUtils.java => HeartBeatExecutor.java} (80%) create mode 100644 components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/mock/TestHeartBeatManagementService.java diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/AbstractDeviceDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/AbstractDeviceDAOImpl.java index 43889beb7d..bff424c208 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/AbstractDeviceDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/AbstractDeviceDAOImpl.java @@ -831,10 +831,10 @@ public abstract class AbstractDeviceDAOImpl implements DeviceDAO { " WHERE DEVICE_TYPE_ID = t.ID" + " AND t.NAME = ?" + " AND t.ID = d.DEVICE_TYPE_ID" + - " AND d.TENANT_ID = ?) d1" + + " AND d.TENANT_ID = ?) d1 " + "WHERE d1.ID = e.DEVICE_ID" + " AND TENANT_ID = ?" + - " AND MOD(d1.ID, ?) = ?" + + " AND MOD(d1.ID, ?) = ? " + "ORDER BY e.DATE_OF_LAST_UPDATE DESC"; stmt = conn.prepareStatement(sql); diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java index a6db88ea5f..e26b06292f 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java @@ -342,6 +342,9 @@ public class OperationManagerImpl implements OperationManager { Map enrolments = new HashMap<>(); for (Device device : devices) { enrolments.put(device.getEnrolmentInfo().getId(), device); + if(log.isDebugEnabled()){ + log.info("Adding operation for device Id : " + device.getDeviceIdentifier()); + } } if (operationDto.getControl() == org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT) { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationMappingDAO.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationMappingDAO.java index 4b57829274..0c604fa560 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationMappingDAO.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationMappingDAO.java @@ -55,6 +55,24 @@ public interface OperationMappingDAO { long maxDuration, int deviceTypeId) throws OperationManagementDAOException; + + /** + * This method returns first pending/repeated operation available for each active enrolment of given device-type + * in a task partitioned execution scenario + * where the operation was created after the given timestamp. + * + * @param minDuration + * @param maxDuration + * @param deviceTypeId + * @param activeServerCount + * @param serverHashIndex + * @return + */ + List getFirstPendingOperationMappingsForActiveEnrolments(long minDuration, + long maxDuration, int deviceTypeId, + int activeServerCount, int serverHashIndex) + throws OperationManagementDAOException; + /** * This method returns the timestamp of last completed Operation for each active enrolment of given device-type * where the operation was completed after the given timestamp. @@ -67,4 +85,19 @@ public interface OperationMappingDAO { Map getLastConnectedTimeForActiveEnrolments(long timeStamp, int deviceTypeId) throws OperationManagementDAOException; + + /** + * This method returns the timestamp of last completed Operation for each active enrolment of given device-type + * in a task partitioned execution scenario + * where the operation was completed after the given timestamp. + * + * @param timeStamp + * @param deviceTypeId + * @param activeServerCount + * @param serverHashIndex + * @return + */ + Map getLastConnectedTimeForActiveEnrolments(long timeStamp, int deviceTypeId, int activeServerCount, int serverHashIndex) + throws OperationManagementDAOException; + } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/OperationMappingDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/OperationMappingDAOImpl.java index b57d313bdf..10742b5c28 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/OperationMappingDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/OperationMappingDAOImpl.java @@ -227,6 +227,46 @@ public class OperationMappingDAOImpl implements OperationMappingDAO { return enrolmentOperationMappingList; } + @Override + public List getFirstPendingOperationMappingsForActiveEnrolments(long minDuration, + long maxDuration, int deviceTypeId, + int activeServerCount, int serverHashIndex) throws OperationManagementDAOException { + PreparedStatement stmt = null; + ResultSet rs = null; + List enrolmentOperationMappingList; + try { + Connection conn = OperationManagementDAOFactory.getConnection(); + //We are specifically looking for operation mappings in 'Pending' & 'Repeated' states. Further we want + //devices to be active at that moment. Hence filtering by 'ACTIVE' & 'UNREACHABLE' device states. + String sql = "SELECT ENROLMENT_ID, D.DEVICE_IDENTIFICATION AS DEVICE_IDENTIFIER, MIN(CREATED_TIMESTAMP) " + + "AS CREATED_TIMESTAMP, E.STATUS AS ENROLMENT_STATUS, E.TENANT_ID FROM " + + "DM_ENROLMENT_OP_MAPPING OP INNER JOIN DM_ENROLMENT E ON OP.ENROLMENT_ID = E.ID INNER JOIN " + + "DM_DEVICE D ON E.DEVICE_ID = D.ID WHERE " + + "OP.STATUS IN ('"+ Operation.Status.PENDING.name() + "','" + Operation.Status.REPEATED.name() + "') " + + "AND OP.CREATED_TIMESTAMP BETWEEN ? AND ? AND E.STATUS IN ('" + EnrolmentInfo.Status.ACTIVE.name() + + "','" + EnrolmentInfo.Status.UNREACHABLE.name() + "') AND D.DEVICE_TYPE_ID = ? AND MOD(D.ID, ?) = ? GROUP BY ENROLMENT_ID," + + " D.DEVICE_IDENTIFICATION, E.STATUS, E.TENANT_ID"; + stmt = conn.prepareStatement(sql); + stmt.setLong(1, maxDuration); + stmt.setLong(2, minDuration); + stmt.setInt(3, deviceTypeId); + stmt.setInt(4, activeServerCount); + stmt.setInt(5, serverHashIndex); + rs = stmt.executeQuery(); + enrolmentOperationMappingList = new ArrayList<>(); + while (rs.next()) { + OperationEnrolmentMapping enrolmentOperationMapping = this.getEnrolmentOpMapping(rs); + enrolmentOperationMappingList.add(enrolmentOperationMapping); + } + } catch (SQLException e) { + throw new OperationManagementDAOException("Error occurred while fetching pending operation mappings for " + + "active devices of type '" + deviceTypeId + "'", e); + } finally { + OperationManagementDAOUtil.cleanupResources(stmt, rs); + } + return enrolmentOperationMappingList; + } + @Override public Map getLastConnectedTimeForActiveEnrolments(long timeStamp, int deviceTypeId) throws OperationManagementDAOException { PreparedStatement stmt = null; @@ -259,6 +299,40 @@ public class OperationMappingDAOImpl implements OperationMappingDAO { return lastConnectedTimeMap; } + @Override + public Map getLastConnectedTimeForActiveEnrolments(long timeStamp, int deviceTypeId, int activeServerCount, int serverHashIndex) throws OperationManagementDAOException { + PreparedStatement stmt = null; + ResultSet rs = null; + Map lastConnectedTimeMap = null; + try { + Connection conn = OperationManagementDAOFactory.getConnection(); + //We are specifically looking for operation mappings in 'Pending' & 'Repeated' states. Further we want + //devices to be active at that moment. Hence filtering by 'ACTIVE' & 'UNREACHABLE' device states. + String sql = "SELECT OP.ENROLMENT_ID AS EID, MAX(OP.UPDATED_TIMESTAMP) AS LAST_CONNECTED_TIME FROM " + + "DM_ENROLMENT_OP_MAPPING OP INNER JOIN DM_ENROLMENT E ON OP.ENROLMENT_ID = E.ID INNER JOIN " + + "DM_DEVICE D ON E.DEVICE_ID = D.ID WHERE " + + "OP.STATUS = '" + Operation.Status.COMPLETED.name() + "'" + + "AND OP.UPDATED_TIMESTAMP >= ? AND E.STATUS IN ('" + EnrolmentInfo.Status.ACTIVE.name() + + "','" + EnrolmentInfo.Status.UNREACHABLE.name() + "') AND D.DEVICE_TYPE_ID = ? AND MOD(D.ID, ?) = ? GROUP BY ENROLMENT_ID"; + stmt = conn.prepareStatement(sql); + stmt.setLong(1, timeStamp); + stmt.setInt(2, deviceTypeId); + stmt.setInt(3, activeServerCount); + stmt.setInt(4, serverHashIndex); + rs = stmt.executeQuery(); + lastConnectedTimeMap = new HashMap<>(); + while (rs.next()) { + lastConnectedTimeMap.put(rs.getInt("EID"), rs.getLong("LAST_CONNECTED_TIME")); + } + } catch (SQLException e) { + throw new OperationManagementDAOException("Error occurred while fetching last connected time for " + + "active devices of type '" + deviceTypeId + "'", e); + } finally { + OperationManagementDAOUtil.cleanupResources(stmt, rs); + } + return lastConnectedTimeMap; + } + private OperationEnrolmentMapping getEnrolmentOpMapping(ResultSet rs) throws SQLException { OperationEnrolmentMapping enrolmentOperationMapping = new OperationEnrolmentMapping(); enrolmentOperationMapping.setEnrolmentId(rs.getInt("ENROLMENT_ID")); diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java index 1bffb03eb9..9aa430ee61 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.wso2.carbon.device.mgt.common.Device; import org.wso2.carbon.device.mgt.common.DeviceIdentifier; import org.wso2.carbon.device.mgt.common.DeviceStatusTaskPluginConfig; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.EnrolmentInfo; import org.wso2.carbon.device.mgt.common.exceptions.TransactionManagementException; import org.wso2.carbon.device.mgt.core.cache.impl.DeviceCacheManagerImpl; @@ -33,6 +34,7 @@ import org.wso2.carbon.device.mgt.core.operation.mgt.OperationEnrolmentMapping; import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOException; import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOFactory; import org.wso2.carbon.device.mgt.core.status.task.DeviceStatusTaskException; +import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask; import org.wso2.carbon.ntask.core.Task; import java.sql.SQLException; @@ -44,7 +46,7 @@ import java.util.Map; * This implements the Task service which monitors the device activity periodically & update the device-status if * necessary. */ -public class DeviceStatusMonitoringTask implements Task { +public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask { private static final Log log = LogFactory.getLog(DeviceStatusMonitoringTask.class); private String deviceType; @@ -61,7 +63,7 @@ public class DeviceStatusMonitoringTask implements Task { } @Override - public void init() { + protected void setup() { } @@ -73,10 +75,11 @@ public class DeviceStatusMonitoringTask implements Task { EnrolmentInfo enrolmentInfo; DeviceIdentifier deviceIdentifier; Device device; + super.refreshContext(); try { - operationEnrolmentMappings = this.getOperationEnrolmentMappings(); + operationEnrolmentMappings = this.getOperationEnrolmentMappings(super.getTaskContext()); if (operationEnrolmentMappings.size() > 0) { - lastActivities = this.getLastDeviceActivities(); + lastActivities = this.getLastDeviceActivities(super.getTaskContext()); } } catch (DeviceStatusTaskException e) { log.error("Error occurred while fetching OperationEnrolment mappings of deviceType '" + deviceType + "'", e); @@ -104,6 +107,9 @@ public class DeviceStatusMonitoringTask implements Task { DeviceCacheManagerImpl.getInstance().addDeviceToCache(deviceIdentifier, device, mapping.getTenantId()); } enrolmentInfoTobeUpdated.add(enrolmentInfo); + if(log.isDebugEnabled()){ + log.debug("Enrollment Information updated for device ID : " + device.getDeviceIdentifier()); + } } } @@ -163,13 +169,21 @@ public class DeviceStatusMonitoringTask implements Task { return updateStatus; } - private List getOperationEnrolmentMappings() throws DeviceStatusTaskException { + private List getOperationEnrolmentMappings(DynamicTaskContext ctx) throws DeviceStatusTaskException { List operationEnrolmentMappings; try { OperationManagementDAOFactory.openConnection(); - operationEnrolmentMappings = OperationManagementDAOFactory. - getOperationMappingDAO().getFirstPendingOperationMappingsForActiveEnrolments(this.getMinTimeWindow(), - this.getMaxTimeWindow(), this.deviceTypeId); + if(ctx != null && ctx.isPartitioningEnabled()){ + operationEnrolmentMappings = OperationManagementDAOFactory. + getOperationMappingDAO().getFirstPendingOperationMappingsForActiveEnrolments(this.getMinTimeWindow(), + this.getMaxTimeWindow(), this.deviceTypeId, + ctx.getActiveServerCount(), ctx.getServerHashIndex()); + } else { + operationEnrolmentMappings = OperationManagementDAOFactory. + getOperationMappingDAO().getFirstPendingOperationMappingsForActiveEnrolments(this.getMinTimeWindow(), + this.getMaxTimeWindow(), this.deviceTypeId); + } + } catch (SQLException e) { throw new DeviceStatusTaskException("Error occurred while getting Enrolment operation mappings for " + "determining device status of deviceType '" + deviceType + "'", e); @@ -182,13 +196,20 @@ public class DeviceStatusMonitoringTask implements Task { return operationEnrolmentMappings; } - private Map getLastDeviceActivities() throws DeviceStatusTaskException { + private Map getLastDeviceActivities(DynamicTaskContext ctx) throws DeviceStatusTaskException { Map lastActivities; try { OperationManagementDAOFactory.openConnection(); - lastActivities = OperationManagementDAOFactory. - getOperationMappingDAO().getLastConnectedTimeForActiveEnrolments(this.getMaxTimeWindow(), - this.deviceTypeId); + if(ctx != null && ctx.isPartitioningEnabled()) { + lastActivities = OperationManagementDAOFactory. + getOperationMappingDAO().getLastConnectedTimeForActiveEnrolments(this.getMaxTimeWindow(), + this.deviceTypeId, + ctx.getActiveServerCount(), ctx.getServerHashIndex()); + } else { + lastActivities = OperationManagementDAOFactory. + getOperationMappingDAO().getLastConnectedTimeForActiveEnrolments(this.getMaxTimeWindow(), + this.deviceTypeId); + } } catch (SQLException e) { throw new DeviceStatusTaskException("Error occurred while getting last activities for " + "determining device status of deviceType '" + deviceType + "'", e); @@ -200,4 +221,4 @@ public class DeviceStatusMonitoringTask implements Task { } return lastActivities; } -} \ No newline at end of file +} diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java index b8da8e6124..2c4425ff22 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java @@ -66,6 +66,7 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask { @Override public void execute() { + super.refreshContext(); deviceManagementProviderService = DeviceManagementDataHolder.getInstance() .getDeviceManagementProvider(); OperationMonitoringTaskConfig operationMonitoringTaskConfig = deviceManagementProviderService diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java index 8568164571..d124874dbe 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java @@ -39,8 +39,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task { ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); if(ctxInfo!=null){ taskContext = new DynamicTaskContext(); - taskContext.setActiveServerCount(ctxInfo.getActiveServerCount()); - taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx()); + updateContext(ctxInfo); if(ctxInfo.getActiveServerCount() > 0){ taskContext.setPartitioningEnabled(true); @@ -60,6 +59,25 @@ public abstract class DynamicPartitionedScheduleTask implements Task { setup(); } + public DynamicTaskContext refreshContext(){ + try { + ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); + if(ctxInfo != null) { + updateContext(ctxInfo); + } else { + log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode."); + } + } catch (HeartBeatManagementException e) { + log.error("Error refreshing Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.", e); + } + return taskContext; + } + + private void updateContext(ServerCtxInfo ctxInfo) { + taskContext.setActiveServerCount(ctxInfo.getActiveServerCount()); + taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx()); + } + protected abstract void setup(); public static DynamicTaskContext getTaskContext() { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/TestHeartBeatManagementService.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/TestHeartBeatManagementService.java new file mode 100644 index 0000000000..1d1efa10ec --- /dev/null +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/TestHeartBeatManagementService.java @@ -0,0 +1,24 @@ +package org.wso2.carbon.device.mgt.core; + +import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; +import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; +import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; +import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; +import org.wso2.carbon.device.mgt.common.ServerCtxInfo; + +public class TestHeartBeatManagementService implements HeartBeatManagementService { + @Override + public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException { + return null; + } + + @Override + public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException { + return null; + } + + @Override + public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException { + return false; + } +} diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManagerTest.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManagerTest.java index 44554527bc..358d2fbf3c 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManagerTest.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManagerTest.java @@ -17,6 +17,7 @@ */ package org.wso2.carbon.device.mgt.core.task; +import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.powermock.api.mockito.PowerMockito; @@ -32,6 +33,7 @@ import org.wso2.carbon.device.mgt.common.operation.mgt.OperationManagementExcept import org.wso2.carbon.device.mgt.common.operation.mgt.OperationManager; import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService; import org.wso2.carbon.device.mgt.core.TestDeviceManagementService; +import org.wso2.carbon.device.mgt.core.TestHeartBeatManagementService; import org.wso2.carbon.device.mgt.core.TestUtils; import org.wso2.carbon.device.mgt.core.authorization.DeviceAccessAuthorizationServiceImpl; import org.wso2.carbon.device.mgt.core.common.BaseDeviceManagementTest; @@ -83,6 +85,9 @@ public class DeviceTaskManagerTest extends BaseDeviceManagementTest { DeviceManagementDataHolder.getInstance().setDeviceTaskManagerService(null); DeviceManagementService deviceManagementService = new TestDeviceManagementService( TestDataHolder.TEST_DEVICE_TYPE, MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); + HeartBeatManagementService heartBeatManagementService = new TestHeartBeatManagementService(); + DeviceManagementDataHolder.getInstance() + .setHeartBeatService(heartBeatManagementService); this.operationManager = PowerMockito.spy( new OperationManagerImpl(TestDataHolder.TEST_DEVICE_TYPE, deviceManagementService)); try { diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/HeartBeatBeaconUtils.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/HeartBeatBeaconUtils.java index 9c5b91a93d..570c6c56c9 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/HeartBeatBeaconUtils.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/HeartBeatBeaconUtils.java @@ -18,6 +18,7 @@ package io.entgra.server.bootup.heartbeat.beacon; +import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,10 +30,16 @@ import javax.xml.XMLConstants; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; import java.util.Hashtable; +import java.util.Properties; public class HeartBeatBeaconUtils { @@ -83,6 +90,33 @@ public class HeartBeatBeaconUtils { return ctx; } + public static void saveUUID(String text) throws IOException { + File serverDetails = new File(HeartBeatBeaconConfig.getInstance().getServerUUIDFileLocation()); + serverDetails.createNewFile(); // if file already exists will do nothing + FileOutputStream fos = new FileOutputStream(serverDetails, false); + Properties prop = new Properties(); + prop.setProperty("server.uuid", text); + prop.store(fos, null); + fos.close(); + } + + public static String readUUID() { + InputStream input = null; + String uuid = null; + try { + input = new FileInputStream(HeartBeatBeaconConfig.getInstance().getServerUUIDFileLocation()); + Properties props = new Properties(); + props.load(input); + uuid = props.getProperty("server.uuid"); + input.close(); + } catch (FileNotFoundException e) { + log.info("File : server-credentials.properties does not exist, new UUID will be generated for server."); + } catch (IOException e) { + log.error("Error accessing server-credentials.properties to locate server.uuid.", e); + } + return uuid; + } + } diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/config/HeartBeatBeaconConfig.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/config/HeartBeatBeaconConfig.java index e2bb9e2fec..e427dffc86 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/config/HeartBeatBeaconConfig.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/config/HeartBeatBeaconConfig.java @@ -47,6 +47,9 @@ public class HeartBeatBeaconConfig { private static final String HEART_BEAT_NOTIFIER_CONFIG_PATH = CarbonUtils.getCarbonConfigDirPath() + File.separator + "heart-beat-config.xml"; + private static final String SERVER_UUID_FILE_LOCATION = + CarbonUtils.getCarbonConfigDirPath() + File.separator + "server-credentials.properties"; + private HeartBeatBeaconConfig() { } @@ -112,6 +115,10 @@ public class HeartBeatBeaconConfig { this.enabled = enabled; } + public String getServerUUIDFileLocation(){ + return SERVER_UUID_FILE_LOCATION; + } + public static void init() throws HeartBeatBeaconConfigurationException { try { File emailSenderConfig = new File(HEART_BEAT_NOTIFIER_CONFIG_PATH); diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/HeartBeatDAO.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/HeartBeatDAO.java index 9a731ed8b1..4948f95792 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/HeartBeatDAO.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/HeartBeatDAO.java @@ -22,7 +22,6 @@ import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOExcept import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; -import java.util.List; import java.util.Map; /** @@ -34,6 +33,8 @@ public interface HeartBeatDAO { boolean recordHeatBeat(HeartBeatEvent event) throws HeartBeatDAOException; + boolean checkUUIDValidity(String uuid) throws HeartBeatDAOException; + String retrieveExistingServerCtx(ServerContext ctx) throws HeartBeatDAOException; Map getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException; diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/impl/GenericHeartBeatDAOImpl.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/impl/GenericHeartBeatDAOImpl.java index bdc7d6a52f..c6aaf04872 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/impl/GenericHeartBeatDAOImpl.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/impl/GenericHeartBeatDAOImpl.java @@ -30,11 +30,11 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; +import java.sql.Timestamp; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; /** * This class represents implementation of HeartBeatDAO @@ -89,6 +89,30 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO { } } + @Override + public boolean checkUUIDValidity(String uuid) throws HeartBeatDAOException { + PreparedStatement stmt = null; + ResultSet resultSet = null; + boolean result = false; + try { + Connection conn = HeartBeatBeaconDAOFactory.getConnection(); + String sql = "SELECT ID FROM SERVER_HEART_BEAT_EVENTS WHERE UUID = ?"; + stmt = conn.prepareStatement(sql); + stmt.setString(1, uuid); + + resultSet = stmt.executeQuery(); + if(resultSet.next()){ + result = true; + } + } catch (SQLException e) { + throw new HeartBeatDAOException("Error occurred checking existense of UUID" + uuid + + " amongst heartbeat meta info ", e); + } finally { + HeartBeatBeaconDAOUtil.cleanupResources(stmt, resultSet); + } + return result; + } + @Override public String retrieveExistingServerCtx(ServerContext ctx) throws HeartBeatDAOException { PreparedStatement stmt = null; @@ -116,7 +140,8 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO { } @Override - public Map getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException { + public Map getActiveServerDetails(int elapsedTimeInSeconds) + throws HeartBeatDAOException { PreparedStatement stmt = null; ResultSet resultSet = null; Map ctxList = new HashMap<>(); @@ -124,10 +149,11 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO { Connection conn = HeartBeatBeaconDAOFactory.getConnection(); String sql = "SELECT (@row_number:=@row_number + 1) AS IDX, UUID, HOST_NAME, SERVER_PORT from " + "SERVER_HEART_BEAT_EVENTS, (SELECT @row_number:=-1) AS TEMP " + - "WHERE LAST_UPDATED_TIMESTAMP > DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? SECOND) " + + "WHERE LAST_UPDATED_TIMESTAMP > ? " + "ORDER BY UUID"; stmt = conn.prepareStatement(sql); stmt.setInt(1, elapsedTimeInSeconds); + stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds))); resultSet = stmt.executeQuery(); while (resultSet.next()) { ctxList.put(resultSet.getString("UUID"), HeartBeatBeaconDAOUtil.populateContext(resultSet)); diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java index 3435c16601..5d83a0acd3 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java @@ -18,8 +18,8 @@ package io.entgra.server.bootup.heartbeat.beacon.internal; -import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig; import io.entgra.server.bootup.heartbeat.beacon.HeartBeatBeaconUtils; +import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig; import io.entgra.server.bootup.heartbeat.beacon.config.datasource.DataSourceConfig; import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory; import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; @@ -59,7 +59,7 @@ public class HeartBeatBeaconComponent { HeartBeatBeaconDAOFactory.init(dsConfig); //Setting up executors to notify heart beat status */ - HeartBeatInternalUtils.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails()); + HeartBeatExecutor.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails()); } if (log.isDebugEnabled()) { diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatInternalUtils.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java similarity index 80% rename from components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatInternalUtils.java rename to components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java index c407348b4f..f215288852 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatInternalUtils.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java @@ -18,22 +18,23 @@ package io.entgra.server.bootup.heartbeat.beacon.internal; -import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig; import io.entgra.server.bootup.heartbeat.beacon.HeartBeatBeaconConfigurationException; -import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory; +import io.entgra.server.bootup.heartbeat.beacon.HeartBeatBeaconUtils; +import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig; import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -public class HeartBeatInternalUtils { +public class HeartBeatExecutor { - private static Log log = LogFactory.getLog(HeartBeatInternalUtils.class); + private static Log log = LogFactory.getLog(HeartBeatExecutor.class); private static final int DEFAULT__NOTIFIER_INTERVAL = 5; private static final int DEFAULT_NOTIFIER_DELAY = 5; private static HeartBeatBeaconConfig CONFIG; @@ -51,12 +52,17 @@ public class HeartBeatInternalUtils { } try { - String uuid = HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().updateServerContext(ctx); - HeartBeatBeaconDataHolder.getInstance().setLocalServerUUID(uuid); + String uuid = HeartBeatBeaconUtils.readUUID(); + if(uuid == null){ + uuid = HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().updateServerContext(ctx); + HeartBeatBeaconUtils.saveUUID(uuid); + } + final String designatedUUID = uuid; + HeartBeatBeaconDataHolder.getInstance().setLocalServerUUID(designatedUUID); Runnable periodicTask = new Runnable() { public void run() { try { - recordHeartBeat(uuid); + recordHeartBeat(designatedUUID); } catch (HeartBeatManagementException e) { log.error("Error while executing record heart beat task. This will result in schedule operation malfunction.", e); } @@ -68,6 +74,8 @@ public class HeartBeatInternalUtils { TimeUnit.SECONDS); } catch (HeartBeatManagementException e) { throw new HeartBeatBeaconConfigurationException("Error occured while updating initial server context.", e); + } catch (IOException e) { + throw new HeartBeatBeaconConfigurationException("Error while persisting UUID of server.", e); } } diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java index 0c0b04d6c8..90644cdd05 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java @@ -19,8 +19,8 @@ package io.entgra.server.bootup.heartbeat.beacon.service; import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; -import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; +import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import org.wso2.carbon.device.mgt.common.ServerCtxInfo; public interface HeartBeatManagementService { diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java index 362f97879e..adc245e5ff 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java @@ -23,8 +23,8 @@ import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory; import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatDAO; import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOException; import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; -import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; +import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.server.bootup.heartbeat.beacon.internal.HeartBeatBeaconDataHolder; import org.wso2.carbon.device.mgt.common.ServerCtxInfo; import org.wso2.carbon.device.mgt.common.exceptions.TransactionManagementException; @@ -34,17 +34,21 @@ import java.util.Map; public class HeartBeatManagementServiceImpl implements HeartBeatManagementService { + private final HeartBeatDAO heartBeatDAO; + + public HeartBeatManagementServiceImpl(){ + this.heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO(); + } + + @Override public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException { - HeartBeatDAO heartBeatDAO; int hashIndex = -1; ServerContext localServerCtx = null; ServerCtxInfo serverCtxInfo = null; if(HeartBeatBeaconConfig.getInstance().isEnabled()) { try { HeartBeatBeaconDAOFactory.openConnection(); - heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO(); - int timeOutIntervalInSeconds = HeartBeatBeaconConfig.getInstance().getServerTimeOutIntervalInSeconds(); int timeSkew = HeartBeatBeaconConfig.getInstance().getTimeSkew(); int cumilativeTimeOut = timeOutIntervalInSeconds + timeSkew; @@ -75,13 +79,10 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic @Override public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException { - HeartBeatDAO heartBeatDAO; String uuid = null; if(HeartBeatBeaconConfig.getInstance().isEnabled()) { try { HeartBeatBeaconDAOFactory.beginTransaction(); - heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO(); - uuid = heartBeatDAO.retrieveExistingServerCtx(ctx); if (uuid == null) { uuid = heartBeatDAO.recordServerCtx(ctx); @@ -107,14 +108,17 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic @Override public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException { - HeartBeatDAO heartBeatDAO; boolean operationSuccess = false; if (HeartBeatBeaconConfig.getInstance().isEnabled()) { try { HeartBeatBeaconDAOFactory.beginTransaction(); - heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO(); - operationSuccess = heartBeatDAO.recordHeatBeat(event); - HeartBeatBeaconDAOFactory.commitTransaction(); + if(heartBeatDAO.checkUUIDValidity(event.getServerUUID())){ + operationSuccess = heartBeatDAO.recordHeatBeat(event); + HeartBeatBeaconDAOFactory.commitTransaction(); + } else { + String msg = "Server UUID Does not exist, heartbeat not recorded."; + throw new HeartBeatManagementException(msg); + } } catch (HeartBeatDAOException e) { String msg = "Error occurred while recording heart beat."; throw new HeartBeatManagementException(msg, e); diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.common/src/main/java/org/wso2/carbon/policy/mgt/common/PolicyAdministratorPoint.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.common/src/main/java/org/wso2/carbon/policy/mgt/common/PolicyAdministratorPoint.java index e9bdbdc014..db5ab41c07 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.common/src/main/java/org/wso2/carbon/policy/mgt/common/PolicyAdministratorPoint.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.common/src/main/java/org/wso2/carbon/policy/mgt/common/PolicyAdministratorPoint.java @@ -18,6 +18,7 @@ package org.wso2.carbon.policy.mgt.common; import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.policy.mgt.Policy; import org.wso2.carbon.device.mgt.common.policy.mgt.Profile; diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/cache/PolicyCacheManager.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/cache/PolicyCacheManager.java index ce71b7a213..17b5669f3a 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/cache/PolicyCacheManager.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/cache/PolicyCacheManager.java @@ -19,6 +19,7 @@ package org.wso2.carbon.policy.mgt.core.cache; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.policy.mgt.Policy; import org.wso2.carbon.policy.mgt.common.PolicyManagementException; diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/cache/impl/PolicyCacheManagerImpl.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/cache/impl/PolicyCacheManagerImpl.java index 51c0dc4e81..46e01793a9 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/cache/impl/PolicyCacheManagerImpl.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/cache/impl/PolicyCacheManagerImpl.java @@ -21,6 +21,7 @@ package org.wso2.carbon.policy.mgt.core.cache.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.policy.mgt.Policy; import org.wso2.carbon.policy.mgt.common.PolicyManagementException; import org.wso2.carbon.policy.mgt.core.cache.PolicyCacheManager; diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/dao/impl/ProfileDAOImpl.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/dao/impl/ProfileDAOImpl.java index 1483e6ebc7..66f7528d1b 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/dao/impl/ProfileDAOImpl.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/dao/impl/ProfileDAOImpl.java @@ -21,6 +21,7 @@ package org.wso2.carbon.policy.mgt.core.dao.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.context.PrivilegedCarbonContext; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.policy.mgt.Profile; import org.wso2.carbon.policy.mgt.core.dao.PolicyManagementDAOFactory; import org.wso2.carbon.policy.mgt.core.dao.ProfileDAO; diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/enforcement/DelegationTask.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/enforcement/DelegationTask.java index 2cb4a29573..bb070f792c 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/enforcement/DelegationTask.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/enforcement/DelegationTask.java @@ -26,7 +26,7 @@ import org.wso2.carbon.device.mgt.common.EnrolmentInfo; import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager; import org.wso2.carbon.device.mgt.core.config.policy.PolicyConfiguration; import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService; -import org.wso2.carbon.ntask.core.Task; +import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask; import org.wso2.carbon.policy.mgt.common.PolicyManagementException; import org.wso2.carbon.policy.mgt.core.cache.impl.PolicyCacheManagerImpl; import org.wso2.carbon.policy.mgt.core.internal.PolicyManagementDataHolder; @@ -38,7 +38,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -public class DelegationTask implements Task { +public class DelegationTask extends DynamicPartitionedScheduleTask { private static final Log log = LogFactory.getLog(DelegationTask.class); private PolicyConfiguration policyConfiguration = DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getPolicyConfiguration(); @@ -48,14 +48,9 @@ public class DelegationTask implements Task { } - @Override - public void init() { - - } - @Override public void execute() { - + super.refreshContext(); try { PolicyManager policyManager = new PolicyManagerImpl(); UpdatedPolicyDeviceListBean updatedPolicyDeviceList = policyManager.applyChangesMadeToPolicies(); @@ -75,7 +70,13 @@ public class DelegationTask implements Task { try { devices = new ArrayList<>(); toBeNotified = new ArrayList<>(); - devices.addAll(service.getAllDevices(deviceType, false)); + if(super.getTaskContext() != null && super.getTaskContext().isPartitioningEnabled()) { + devices.addAll(service.getAllocatedDevices(deviceType, + super.getTaskContext().getActiveServerCount(), + super.getTaskContext().getServerHashIndex())); + } else { + devices.addAll(service.getAllDevices(deviceType, false)); + } //HashMap deviceIdPolicy = policyManager.getAppliedPolicyIdsDeviceIds(); for (Device device : devices) { // if (deviceIdPolicy.containsKey(device.getId())) { @@ -84,6 +85,9 @@ public class DelegationTask implements Task { toBeNotified.add(device); } // } + if(log.isDebugEnabled()){ + log.debug("Adding policy operation to device : " + device.getDeviceIdentifier()); + } } if (!toBeNotified.isEmpty()) { PolicyEnforcementDelegator enforcementDelegator = new PolicyEnforcementDelegatorImpl @@ -102,4 +106,9 @@ public class DelegationTask implements Task { log.error("Error occurred while getting the policies applied to devices.", e); } } + + @Override + protected void setup() { + + } } diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/impl/PolicyAdministratorPointImpl.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/impl/PolicyAdministratorPointImpl.java index c555b27b97..8a1546c20f 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/impl/PolicyAdministratorPointImpl.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/impl/PolicyAdministratorPointImpl.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.policy.mgt.Policy; import org.wso2.carbon.device.mgt.common.policy.mgt.Profile; import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager; diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/PolicyManager.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/PolicyManager.java index 886e0a5715..a31fe3d50a 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/PolicyManager.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/PolicyManager.java @@ -19,6 +19,7 @@ package org.wso2.carbon.policy.mgt.core.mgt; import org.wso2.carbon.device.mgt.common.Device; import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.policy.mgt.Policy; import org.wso2.carbon.policy.mgt.common.PolicyManagementException; import org.wso2.carbon.policy.mgt.core.mgt.bean.UpdatedPolicyDeviceListBean; diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/impl/PolicyManagerImpl.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/impl/PolicyManagerImpl.java index e39f061a65..a85828e2f5 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/impl/PolicyManagerImpl.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/impl/PolicyManagerImpl.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.device.mgt.common.Device; import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException; import org.wso2.carbon.device.mgt.common.exceptions.InvalidDeviceException; import org.wso2.carbon.device.mgt.common.group.mgt.DeviceGroup; diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/impl/ProfileManagerImpl.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/impl/ProfileManagerImpl.java index 160d512ee5..30c01cc1fb 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/impl/ProfileManagerImpl.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/mgt/impl/ProfileManagerImpl.java @@ -20,6 +20,7 @@ package org.wso2.carbon.policy.mgt.core.mgt.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.policy.mgt.Profile; import org.wso2.carbon.device.mgt.common.policy.mgt.ProfileFeature; import org.wso2.carbon.device.mgt.core.dao.DeviceManagementDAOFactory; diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java index 2129f78286..574f93f6e6 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java @@ -96,7 +96,7 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask { } private void executeTask() { - + super.refreshContext(); MonitoringManager monitoringManager = PolicyManagementDataHolder.getInstance().getMonitoringManager(); List deviceTypes = new ArrayList<>(); List configDeviceTypes = new ArrayList<>(); @@ -142,6 +142,9 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask { status.equals(EnrolmentInfo.Status.UNREACHABLE)) { notifiableDevices.add(device); } + if(log.isDebugEnabled()){ + log.debug("Adding monitoring operation to device : " + device.getDeviceIdentifier()); + } } if (log.isDebugEnabled()) { log.debug("Following '" + deviceType + "' devices selected to send the notification " + diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/PolicyManagerServiceImplTest.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/PolicyManagerServiceImplTest.java index d372eb63d8..310003e829 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/PolicyManagerServiceImplTest.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/PolicyManagerServiceImplTest.java @@ -17,6 +17,7 @@ */ package org.wso2.carbon.policy.mgt.core; +import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.testng.Assert; @@ -51,6 +52,7 @@ import org.wso2.carbon.policy.mgt.core.enforcement.DelegationTask; import org.wso2.carbon.policy.mgt.core.internal.PolicyManagementDataHolder; import org.wso2.carbon.policy.mgt.core.mgt.MonitoringManager; import org.wso2.carbon.policy.mgt.core.mgt.impl.MonitoringManagerImpl; +import org.wso2.carbon.policy.mgt.core.mock.TestHeartBeatManagementService; import org.wso2.carbon.policy.mgt.core.mock.TypeXDeviceManagementService; import org.wso2.carbon.policy.mgt.core.task.MonitoringTask; import org.wso2.carbon.policy.mgt.core.task.TaskScheduleService; @@ -93,6 +95,8 @@ public class PolicyManagerServiceImplTest extends BasePolicyManagementDAOTest { DeviceManagementService deviceManagementService = new TypeXDeviceManagementService(DEVICE_TYPE_A); deviceMgtService.registerDeviceType(deviceManagementService); operationManager = new OperationManagerImpl(DEVICE_TYPE_A, deviceManagementService); + HeartBeatManagementService heartBeatManagementService = new TestHeartBeatManagementService(); + DeviceManagementDataHolder.getInstance().setHeartBeatService(heartBeatManagementService); enrollDevice(DEVICE1, DEVICE_TYPE_A); createDeviceGroup(GROUP1); DeviceGroup group1 = groupMgtService.getGroup(GROUP1, false); @@ -417,4 +421,4 @@ public class PolicyManagerServiceImplTest extends BasePolicyManagementDAOTest { Assert.assertNotNull(currentProfile.getProfileFeaturesList().get(0).getFeatureCode(), updatedProfile.getProfileFeaturesList().get(0).getFeatureCode()); } -} \ No newline at end of file +} diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/mock/TestHeartBeatManagementService.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/mock/TestHeartBeatManagementService.java new file mode 100644 index 0000000000..8b9f711aa0 --- /dev/null +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/mock/TestHeartBeatManagementService.java @@ -0,0 +1,24 @@ +package org.wso2.carbon.policy.mgt.core.mock; + +import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; +import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; +import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; +import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; +import org.wso2.carbon.device.mgt.common.ServerCtxInfo; + +public class TestHeartBeatManagementService implements HeartBeatManagementService { + @Override + public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException { + return null; + } + + @Override + public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException { + return null; + } + + @Override + public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException { + return false; + } +}