From e7b97d78c75a9b543f886ceffd5125425399bc6c Mon Sep 17 00:00:00 2001 From: Ace Date: Fri, 20 Nov 2020 05:48:54 +0530 Subject: [PATCH] Imrpoving task implementation --- .../device/mgt/common/DynamicTaskContext.java | 32 ++++ .../operation/mgt/OperationManager.java | 3 +- .../carbon/device/mgt/core/dao/DeviceDAO.java | 14 +- .../core/dao/impl/AbstractDeviceDAOImpl.java | 2 +- .../dao/impl/device/GenericDeviceDAOImpl.java | 135 +++++++++++++++++ .../dao/impl/device/OracleDeviceDAOImpl.java | 138 ++++++++++++++++++ .../impl/device/PostgreSQLDeviceDAOImpl.java | 126 ++++++++++++++++ .../impl/device/SQLServerDeviceDAOImpl.java | 137 +++++++++++++++++ .../operation/mgt/OperationManagerImpl.java | 14 +- .../DeviceManagementProviderService.java | 3 +- .../DeviceManagementProviderServiceImpl.java | 7 +- .../mgt/core/task/DeviceTaskManager.java | 4 +- .../task/impl/DeviceDetailsRetrieverTask.java | 13 +- .../core/task/impl/DeviceTaskManagerImpl.java | 5 +- .../impl/DynamicPartitionedScheduleTask.java | 28 ++-- .../mgt/core/task/DeviceTaskManagerTest.java | 6 +- .../policy/mgt/core/task/MonitoringTask.java | 21 ++- 17 files changed, 650 insertions(+), 38 deletions(-) create mode 100644 components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/DynamicTaskContext.java diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/DynamicTaskContext.java b/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/DynamicTaskContext.java new file mode 100644 index 0000000000..64a10265cd --- /dev/null +++ b/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/DynamicTaskContext.java @@ -0,0 +1,32 @@ +package org.wso2.carbon.device.mgt.common; + +public class DynamicTaskContext { + + private int serverHashIndex; + private int activeServerCount; + private boolean partitioningEnabled = false; + + public int getServerHashIndex() { + return serverHashIndex; + } + + public void setServerHashIndex(int serverHashIndex) { + this.serverHashIndex = serverHashIndex; + } + + public int getActiveServerCount() { + return activeServerCount; + } + + public void setActiveServerCount(int activeServerCount) { + this.activeServerCount = activeServerCount; + } + + public boolean isPartitioningEnabled() { + return partitioningEnabled; + } + + public void setPartitioningEnabled(boolean partitioningEnabled) { + this.partitioningEnabled = partitioningEnabled; + } +} diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/operation/mgt/OperationManager.java b/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/operation/mgt/OperationManager.java index 9faa732090..ad59d5c4f8 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/operation/mgt/OperationManager.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/operation/mgt/OperationManager.java @@ -19,6 +19,7 @@ package org.wso2.carbon.device.mgt.common.operation.mgt; import org.wso2.carbon.device.mgt.common.Device; import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException; import org.wso2.carbon.device.mgt.common.exceptions.InvalidDeviceException; import org.wso2.carbon.device.mgt.common.PaginationRequest; @@ -47,7 +48,7 @@ public interface OperationManager { void addTaskOperation(List devices, Operation operation) throws OperationManagementException; - void addTaskOperation(String deviceType, Operation operation) throws OperationManagementException; + void addTaskOperation(String deviceType, Operation operation, DynamicTaskContext dynamicTaskContext) throws OperationManagementException; /** * Method to retrieve the list of all operations to a device. diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/DeviceDAO.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/DeviceDAO.java index b2cb5ac269..2c1df83ad9 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/DeviceDAO.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/DeviceDAO.java @@ -295,6 +295,18 @@ public interface DeviceDAO { */ List getDevices(PaginationRequest request, int tenantId) throws DeviceManagementDAOException; + /** + * This method is used to retrieve the devices of a given tenant as a paginated result, along the lines of + * activeServerCount and serverIndex + * + * @param request + * @param tenantId + * @param activeServerCount + * @param serverIndex + * @return + */ + List getAllocatedDevices(PaginationRequest request, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException; + /** * This method is used to search for devices within a specific group. * @@ -335,7 +347,7 @@ public interface DeviceDAO { * @return returns list of devices of provided type. * @throws DeviceManagementDAOException */ - List getDevices(String type, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException; + List getAllocatedDevices(String type, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException; List getDevices(long timestamp, int tenantId) throws DeviceManagementDAOException; diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/AbstractDeviceDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/AbstractDeviceDAOImpl.java index 1ceb85d868..43889beb7d 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/AbstractDeviceDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/AbstractDeviceDAOImpl.java @@ -802,7 +802,7 @@ public abstract class AbstractDeviceDAOImpl implements DeviceDAO { @Override - public List getDevices(String type, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException { + public List getAllocatedDevices(String type, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException { Connection conn; PreparedStatement stmt = null; ResultSet rs = null; diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/GenericDeviceDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/GenericDeviceDAOImpl.java index f4bda81221..5aefd9f1dc 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/GenericDeviceDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/GenericDeviceDAOImpl.java @@ -176,6 +176,141 @@ public class GenericDeviceDAOImpl extends AbstractDeviceDAOImpl { } } + @Override + public List getAllocatedDevices(PaginationRequest request, int tenantId, int activeServerCount, int serverIndex) + throws DeviceManagementDAOException { + List devices; + String deviceType = request.getDeviceType(); + boolean isDeviceTypeProvided = false; + String deviceName = request.getDeviceName(); + boolean isDeviceNameProvided = false; + String owner = request.getOwner(); + boolean isOwnerProvided = false; + String ownerPattern = request.getOwnerPattern(); + boolean isOwnerPatternProvided = false; + String ownership = request.getOwnership(); + boolean isOwnershipProvided = false; + List statusList = request.getStatusList(); + boolean isStatusProvided = false; + Date since = request.getSince(); + boolean isSinceProvided = false; + boolean isPartitionedTask = false; + + try { + Connection conn = getConnection(); + String sql = "SELECT d1.ID AS DEVICE_ID, " + + "d1.DESCRIPTION, " + + "d1.NAME AS DEVICE_NAME, " + + "d1.DEVICE_TYPE, " + + "d1.DEVICE_IDENTIFICATION, " + + "e.OWNER, " + + "e.OWNERSHIP, " + + "e.STATUS, " + + "e.IS_TRANSFERRED, " + + "e.DATE_OF_LAST_UPDATE, " + + "e.DATE_OF_ENROLMENT, " + + "e.ID AS ENROLMENT_ID " + + "FROM DM_ENROLMENT e, " + + "(SELECT d.ID, " + + "d.DESCRIPTION, " + + "d.NAME, " + + "d.DEVICE_IDENTIFICATION, " + + "t.NAME AS DEVICE_TYPE " + + "FROM DM_DEVICE d, DM_DEVICE_TYPE t "; + //Add the query to filter active devices on timestamp + if (since != null) { + sql = sql + ", DM_DEVICE_DETAIL dt"; + isSinceProvided = true; + } + sql = sql + " WHERE DEVICE_TYPE_ID = t.ID AND d.TENANT_ID = ?"; + //Add query for last updated timestamp + if (isSinceProvided) { + sql = sql + " AND dt.DEVICE_ID = d.ID AND dt.UPDATE_TIMESTAMP > ?"; + } + //Add the query for device-type + if (deviceType != null && !deviceType.isEmpty()) { + sql = sql + " AND t.NAME = ?"; + isDeviceTypeProvided = true; + } + //Add the query for device-name + if (deviceName != null && !deviceName.isEmpty()) { + sql = sql + " AND d.NAME LIKE ?"; + isDeviceNameProvided = true; + } + sql = sql + ") d1 WHERE d1.ID = e.DEVICE_ID AND TENANT_ID = ?"; + //Add the query for ownership + if (ownership != null && !ownership.isEmpty()) { + sql = sql + " AND e.OWNERSHIP = ?"; + isOwnershipProvided = true; + } + //Add the query for owner + if (owner != null && !owner.isEmpty()) { + sql = sql + " AND e.OWNER = ?"; + isOwnerProvided = true; + } else if (ownerPattern != null && !ownerPattern.isEmpty()) { + sql = sql + " AND e.OWNER LIKE ?"; + isOwnerPatternProvided = true; + } + if (statusList != null && !statusList.isEmpty()) { + sql += buildStatusQuery(statusList); + isStatusProvided = true; + } + if (activeServerCount > 0){ + sql = sql + " AND MOD(d1.ID, ?) = ?"; + isPartitionedTask = true; + } + sql = sql + " LIMIT ?,?"; + + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + int paramIdx = 1; + stmt.setInt(paramIdx++, tenantId); + if (isSinceProvided) { + stmt.setLong(paramIdx++, since.getTime()); + } + if (isDeviceTypeProvided) { + stmt.setString(paramIdx++, deviceType); + } + if (isDeviceNameProvided) { + stmt.setString(paramIdx++, deviceName + "%"); + } + stmt.setInt(paramIdx++, tenantId); + if (isOwnershipProvided) { + stmt.setString(paramIdx++, ownership); + } + if (isOwnerProvided) { + stmt.setString(paramIdx++, owner); + } else if (isOwnerPatternProvided) { + stmt.setString(paramIdx++, ownerPattern + "%"); + } + if (isStatusProvided) { + for (String status : statusList) { + stmt.setString(paramIdx++, status); + } + } + if (isPartitionedTask) { + stmt.setInt(paramIdx++, activeServerCount); + stmt.setInt(paramIdx++, serverIndex); + } + stmt.setInt(paramIdx++, request.getStartIndex()); + stmt.setInt(paramIdx, request.getRowCount()); + + try (ResultSet rs = stmt.executeQuery()) { + devices = new ArrayList<>(); + while (rs.next()) { + Device device = DeviceManagementDAOUtil.loadDevice(rs); + devices.add(device); + } + return devices; + } + } + } catch (SQLException e) { + String msg = "Error occurred while retrieving information of all " + + "registered devices"; + log.error(msg, e); + throw new DeviceManagementDAOException(msg, e); + } + } + @Override public List searchDevicesInGroup(PaginationRequest request, int tenantId) throws DeviceManagementDAOException { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/OracleDeviceDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/OracleDeviceDAOImpl.java index 07bd3089ed..2c1cfe2cb4 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/OracleDeviceDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/OracleDeviceDAOImpl.java @@ -176,6 +176,144 @@ public class OracleDeviceDAOImpl extends AbstractDeviceDAOImpl { } } + @Override + public List getAllocatedDevices(PaginationRequest request, int tenantId, + int activeServerCount, int serverIndex) + throws DeviceManagementDAOException { + Connection conn; + List devices = null; + String deviceType = request.getDeviceType(); + boolean isDeviceTypeProvided = false; + String deviceName = request.getDeviceName(); + boolean isDeviceNameProvided = false; + String owner = request.getOwner(); + boolean isOwnerProvided = false; + String ownerPattern = request.getOwnerPattern(); + boolean isOwnerPatternProvided = false; + String ownership = request.getOwnership(); + boolean isOwnershipProvided = false; + List statusList = request.getStatusList(); + boolean isStatusProvided = false; + Date since = request.getSince(); + boolean isSinceProvided = false; + boolean isPartitionedTask = false; + + try { + conn = getConnection(); + String sql = "SELECT d1.ID AS DEVICE_ID, " + + "d1.DESCRIPTION, " + + "d1.NAME AS DEVICE_NAME, " + + "d1.DEVICE_TYPE, " + + "d1.DEVICE_IDENTIFICATION, " + + "e.OWNER, " + + "e.OWNERSHIP, " + + "e.STATUS, " + + "e.IS_TRANSFERRED, " + + "e.DATE_OF_LAST_UPDATE, " + + "e.DATE_OF_ENROLMENT, " + + "e.ID AS ENROLMENT_ID " + + "FROM DM_ENROLMENT e, " + + "(SELECT d.ID, " + + "d.DESCRIPTION, " + + "d.NAME, " + + "d.DEVICE_IDENTIFICATION, " + + "t.NAME AS DEVICE_TYPE " + + "FROM DM_DEVICE d, " + + "DM_DEVICE_TYPE t "; + //Add the query to filter active devices on timestamp + if (since != null) { + sql = sql + ", DM_DEVICE_DETAIL dt"; + isSinceProvided = true; + } + sql = sql + " WHERE DEVICE_TYPE_ID = t.ID AND d.TENANT_ID = ?"; + //Add query for last updated timestamp + if (isSinceProvided) { + sql = sql + " AND dt.DEVICE_ID = d.ID AND dt.UPDATE_TIMESTAMP > ?"; + } + //Add the query for device-type + if (deviceType != null && !deviceType.isEmpty()) { + sql = sql + " AND t.NAME = ?"; + isDeviceTypeProvided = true; + } + //Add the query for device-name + if (deviceName != null && !deviceName.isEmpty()) { + sql = sql + " AND d.NAME LIKE ?"; + isDeviceNameProvided = true; + } + sql = sql + ") d1 WHERE d1.ID = e.DEVICE_ID AND TENANT_ID = ?"; + //Add the query for ownership + if (ownership != null && !ownership.isEmpty()) { + sql = sql + " AND e.OWNERSHIP = ?"; + isOwnershipProvided = true; + } + //Add the query for owner + if (owner != null && !owner.isEmpty()) { + sql = sql + " AND e.OWNER = ?"; + isOwnerProvided = true; + } else if (ownerPattern != null && !ownerPattern.isEmpty()) { + sql = sql + " AND e.OWNER LIKE ?"; + isOwnerPatternProvided = true; + } + if (statusList != null && !statusList.isEmpty()) { + sql += buildStatusQuery(statusList); + isStatusProvided = true; + } + if (activeServerCount > 0){ + sql = sql + " AND MOD(d1.ID, ?) = ?"; + isPartitionedTask = true; + } + sql = sql + " ORDER BY ENROLMENT_ID OFFSET ? ROWS FETCH NEXT ? ROWS ONLY"; + + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + int paramIdx = 1; + stmt.setInt(paramIdx++, tenantId); + if (isSinceProvided) { + stmt.setLong(paramIdx++, since.getTime()); + } + if (isDeviceTypeProvided) { + stmt.setString(paramIdx++, deviceType); + } + if (isDeviceNameProvided) { + stmt.setString(paramIdx++, deviceName + "%"); + } + stmt.setInt(paramIdx++, tenantId); + if (isOwnershipProvided) { + stmt.setString(paramIdx++, ownership); + } + if (isOwnerProvided) { + stmt.setString(paramIdx++, owner); + } else if (isOwnerPatternProvided) { + stmt.setString(paramIdx++, ownerPattern + "%"); + } + if (isStatusProvided) { + for (String status : statusList) { + stmt.setString(paramIdx++, status); + } + } + if (isPartitionedTask) { + stmt.setInt(paramIdx++, activeServerCount); + stmt.setInt(paramIdx++, serverIndex); + } + stmt.setInt(paramIdx++, request.getStartIndex()); + stmt.setInt(paramIdx, request.getRowCount()); + + try (ResultSet rs = stmt.executeQuery()) { + devices = new ArrayList<>(); + while (rs.next()) { + Device device = DeviceManagementDAOUtil.loadDevice(rs); + devices.add(device); + } + return devices; + } + } + } catch (SQLException e) { + String msg = "Error occurred while retrieving information of all " + + "registered devices"; + log.error(msg, e); + throw new DeviceManagementDAOException(msg, e); + } + } + @Override public List searchDevicesInGroup(PaginationRequest request, int tenantId) throws DeviceManagementDAOException { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/PostgreSQLDeviceDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/PostgreSQLDeviceDAOImpl.java index da1c8a2630..d9b7cadf63 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/PostgreSQLDeviceDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/PostgreSQLDeviceDAOImpl.java @@ -164,6 +164,132 @@ public class PostgreSQLDeviceDAOImpl extends AbstractDeviceDAOImpl { } } + public List getAllocatedDevices(PaginationRequest request, int tenantId, + int activeServerCount, int serverIndex) + throws DeviceManagementDAOException { + Connection conn; + List devices = null; + String deviceType = request.getDeviceType(); + boolean isDeviceTypeProvided = false; + String deviceName = request.getDeviceName(); + boolean isDeviceNameProvided = false; + String owner = request.getOwner(); + boolean isOwnerProvided = false; + String ownerPattern = request.getOwnerPattern(); + boolean isOwnerPatternProvided = false; + String ownership = request.getOwnership(); + boolean isOwnershipProvided = false; + List statusList = request.getStatusList(); + boolean isStatusProvided = false; + Date since = request.getSince(); + boolean isSinceProvided = false; + boolean isPartitionedTask = false; + + try { + conn = getConnection(); + String sql = "SELECT d1.ID AS DEVICE_ID, " + + "d1.DESCRIPTION, " + + "d1.NAME AS DEVICE_NAME, " + + "d1.DEVICE_TYPE, " + + "d1.DEVICE_IDENTIFICATION, " + + "e.OWNER, " + + "e.OWNERSHIP, " + + "e.STATUS, " + + "e.IS_TRANSFERRED, " + + "e.DATE_OF_LAST_UPDATE, " + + "e.DATE_OF_ENROLMENT, " + + "e.ID AS ENROLMENT_ID " + + "FROM DM_ENROLMENT e, " + + "(SELECT d.ID, " + + "d.DESCRIPTION, " + + "d.NAME, " + + "d.DEVICE_IDENTIFICATION, " + + "t.NAME AS DEVICE_TYPE " + + "FROM DM_DEVICE d, " + + "DM_DEVICE_TYPE t " + + "WHERE DEVICE_TYPE_ID = t.ID " + + "AND d.TENANT_ID = ?"; + //Add the query for device-type + if (deviceType != null && !deviceType.isEmpty()) { + sql = sql + " AND t.NAME = ?"; + isDeviceTypeProvided = true; + } + //Add the query for device-name + if (deviceName != null && !deviceName.isEmpty()) { + sql = sql + " AND d.NAME LIKE ?"; + isDeviceNameProvided = true; + } + sql = sql + ") d1 WHERE d1.ID = e.DEVICE_ID AND TENANT_ID = ?"; + //Add the query for ownership + if (ownership != null && !ownership.isEmpty()) { + sql = sql + " AND e.OWNERSHIP = ?"; + isOwnershipProvided = true; + } + //Add the query for owner + if (owner != null && !owner.isEmpty()) { + sql = sql + " AND e.OWNER = ?"; + isOwnerProvided = true; + } else if (ownerPattern != null && !ownerPattern.isEmpty()) { + sql = sql + " AND e.OWNER LIKE ?"; + isOwnerPatternProvided = true; + } + if (statusList != null && !statusList.isEmpty()) { + sql += buildStatusQuery(statusList); + isStatusProvided = true; + } + if (activeServerCount > 0){ + sql = sql + " AND MOD(d1.ID, ?) = ?"; + isPartitionedTask = true; + } + sql = sql + " LIMIT ? OFFSET ?"; + + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + int paramIdx = 1; + stmt.setInt(paramIdx++, tenantId); + if (isDeviceTypeProvided) { + stmt.setString(paramIdx++, deviceType); + } + if (isDeviceNameProvided) { + stmt.setString(paramIdx++, deviceName + "%"); + } + stmt.setInt(paramIdx++, tenantId); + if (isOwnershipProvided) { + stmt.setString(paramIdx++, ownership); + } + if (isOwnerProvided) { + stmt.setString(paramIdx++, owner); + } else if (isOwnerPatternProvided) { + stmt.setString(paramIdx++, ownerPattern + "%"); + } + if (isStatusProvided) { + for (String status : statusList) { + stmt.setString(paramIdx++, status); + } + } + if (isPartitionedTask) { + stmt.setInt(paramIdx++, activeServerCount); + stmt.setInt(paramIdx++, serverIndex); + } + stmt.setInt(paramIdx++, request.getRowCount()); + stmt.setInt(paramIdx, request.getStartIndex()); + + try (ResultSet rs = stmt.executeQuery()) { + devices = new ArrayList<>(); + while (rs.next()) { + Device device = DeviceManagementDAOUtil.loadDevice(rs); + devices.add(device); + } + return devices; + } + } + } catch (SQLException e) { + String msg = "Error occurred while retrieving information of all " + + "registered devices"; + log.error(msg, e); + throw new DeviceManagementDAOException(msg, e); + } + } + @Override public List searchDevicesInGroup(PaginationRequest request, int tenantId) throws DeviceManagementDAOException { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/SQLServerDeviceDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/SQLServerDeviceDAOImpl.java index 1d0d9c246c..5d30052869 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/SQLServerDeviceDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dao/impl/device/SQLServerDeviceDAOImpl.java @@ -176,6 +176,143 @@ public class SQLServerDeviceDAOImpl extends AbstractDeviceDAOImpl { } } + @Override + public List getAllocatedDevices(PaginationRequest request, int tenantId, + int activeServerCount, int serverIndex) + throws DeviceManagementDAOException { + Connection conn; + List devices = null; + String deviceType = request.getDeviceType(); + boolean isDeviceTypeProvided = false; + String deviceName = request.getDeviceName(); + boolean isDeviceNameProvided = false; + String owner = request.getOwner(); + boolean isOwnerProvided = false; + String ownerPattern = request.getOwnerPattern(); + boolean isOwnerPatternProvided = false; + String ownership = request.getOwnership(); + boolean isOwnershipProvided = false; + List statusList = request.getStatusList(); + boolean isStatusProvided = false; + Date since = request.getSince(); + boolean isSinceProvided = false; + boolean isPartitionedTask = false; + + try { + conn = getConnection(); + String sql = "SELECT d1.ID AS DEVICE_ID, " + + "d1.DESCRIPTION, " + + "d1.NAME AS DEVICE_NAME, " + + "d1.DEVICE_TYPE, " + + "d1.DEVICE_IDENTIFICATION, " + + "e.OWNER, " + + "e.OWNERSHIP, " + + "e.STATUS, " + + "e.IS_TRANSFERRED, " + + "e.DATE_OF_LAST_UPDATE, " + + "e.DATE_OF_ENROLMENT, " + + "e.ID AS ENROLMENT_ID " + + "FROM DM_ENROLMENT e, " + + "(SELECT d.ID, " + + "d.DESCRIPTION, " + + "d.NAME, " + + "d.DEVICE_IDENTIFICATION, " + + "t.NAME AS DEVICE_TYPE " + + "FROM DM_DEVICE d, DM_DEVICE_TYPE t "; + //Add the query to filter active devices on timestamp + if (since != null) { + sql = sql + ", DM_DEVICE_DETAIL dt"; + isSinceProvided = true; + } + sql = sql + " WHERE DEVICE_TYPE_ID = t.ID AND d.TENANT_ID = ?"; + //Add query for last updated timestamp + if (isSinceProvided) { + sql = sql + " AND dt.DEVICE_ID = d.ID AND dt.UPDATE_TIMESTAMP > ?"; + } + //Add the query for device-type + if (deviceType != null && !deviceType.isEmpty()) { + sql = sql + " AND t.NAME = ?"; + isDeviceTypeProvided = true; + } + //Add the query for device-name + if (deviceName != null && !deviceName.isEmpty()) { + sql = sql + " AND d.NAME LIKE ?"; + isDeviceNameProvided = true; + } + sql = sql + ") d1 WHERE d1.ID = e.DEVICE_ID AND TENANT_ID = ?"; + //Add the query for ownership + if (ownership != null && !ownership.isEmpty()) { + sql = sql + " AND e.OWNERSHIP = ?"; + isOwnershipProvided = true; + } + //Add the query for owner + if (owner != null && !owner.isEmpty()) { + sql = sql + " AND e.OWNER = ?"; + isOwnerProvided = true; + } else if (ownerPattern != null && !ownerPattern.isEmpty()) { + sql = sql + " AND e.OWNER LIKE ?"; + isOwnerPatternProvided = true; + } + if (statusList != null && !statusList.isEmpty()) { + sql += buildStatusQuery(statusList); + isStatusProvided = true; + } + if (activeServerCount > 0){ + sql = sql + " AND d1.ID % ? = ?"; + isPartitionedTask = true; + } + sql = sql + " ORDER BY ENROLMENT_ID OFFSET ? ROWS FETCH NEXT ? ROWS ONLY"; + + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + int paramIdx = 1; + stmt.setInt(paramIdx++, tenantId); + if (isSinceProvided) { + stmt.setLong(paramIdx++, since.getTime()); + } + if (isDeviceTypeProvided) { + stmt.setString(paramIdx++, deviceType); + } + if (isDeviceNameProvided) { + stmt.setString(paramIdx++, deviceName + "%"); + } + stmt.setInt(paramIdx++, tenantId); + if (isOwnershipProvided) { + stmt.setString(paramIdx++, ownership); + } + if (isOwnerProvided) { + stmt.setString(paramIdx++, owner); + } else if (isOwnerPatternProvided) { + stmt.setString(paramIdx++, ownerPattern + "%"); + } + if (isStatusProvided) { + for (String status : statusList) { + stmt.setString(paramIdx++, status); + } + } + if (isPartitionedTask) { + stmt.setInt(paramIdx++, activeServerCount); + stmt.setInt(paramIdx++, serverIndex); + } + stmt.setInt(paramIdx++, request.getStartIndex()); + stmt.setInt(paramIdx, request.getRowCount()); + + try (ResultSet rs = stmt.executeQuery()) { + devices = new ArrayList<>(); + while (rs.next()) { + Device device = DeviceManagementDAOUtil.loadDevice(rs); + devices.add(device); + } + return devices; + } + } + } catch (SQLException e) { + String msg = "Error occurred while retrieving information of all " + + "registered devices"; + log.error(msg, e); + throw new DeviceManagementDAOException(msg, e); + } + } + @Override public List searchDevicesInGroup(PaginationRequest request, int tenantId) throws DeviceManagementDAOException { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java index b2e7b769ae..a6db88ea5f 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java @@ -24,6 +24,7 @@ import org.wso2.carbon.context.CarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.device.mgt.common.Device; import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.EnrolmentInfo; import org.wso2.carbon.device.mgt.common.MonitoringOperation; import org.wso2.carbon.device.mgt.common.OperationMonitoringTaskConfig; @@ -300,7 +301,7 @@ public class OperationManagerImpl implements OperationManager { } @Override - public void addTaskOperation(String deviceType, Operation operation) throws OperationManagementException { + public void addTaskOperation(String deviceType, Operation operation, DynamicTaskContext dynamicTaskContext) throws OperationManagementException { List validStatuses = Arrays.asList(EnrolmentInfo.Status.ACTIVE.toString(), EnrolmentInfo.Status.INACTIVE.toString(), EnrolmentInfo.Status.UNREACHABLE.toString()); @@ -316,7 +317,16 @@ public class OperationManagerImpl implements OperationManager { paginationRequest = new PaginationRequest(start, batchSize); paginationRequest.setStatusList(validStatuses); paginationRequest.setDeviceType(deviceType); - List devices = deviceDAO.getDevices(paginationRequest, tenantId); + List devices; + + if(dynamicTaskContext != null && dynamicTaskContext.isPartitioningEnabled()) { + devices = deviceDAO.getAllocatedDevices(paginationRequest, tenantId, + dynamicTaskContext.getActiveServerCount(), + dynamicTaskContext.getServerHashIndex()); + } else { + devices = deviceDAO.getDevices(paginationRequest, tenantId); + } + if (devices.size() == batchSize) { hasRecords = true; start += batchSize; diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/service/DeviceManagementProviderService.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/service/DeviceManagementProviderService.java index 70c506ee8b..abb957f9dc 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/service/DeviceManagementProviderService.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/service/DeviceManagementProviderService.java @@ -37,6 +37,7 @@ package org.wso2.carbon.device.mgt.core.service; import org.apache.commons.collections.map.SingletonMap; import org.wso2.carbon.device.mgt.common.Device; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.EnrolmentInfo; import org.wso2.carbon.device.mgt.common.FeatureManager; import org.wso2.carbon.device.mgt.common.PaginationRequest; @@ -664,7 +665,7 @@ public interface DeviceManagementProviderService { Activity addOperation(String type, Operation operation, List devices) throws OperationManagementException, InvalidDeviceException; - void addTaskOperation(String deviceType, Operation operation) throws OperationManagementException; + void addTaskOperation(String deviceType, Operation operation, DynamicTaskContext taskContext) throws OperationManagementException; void addTaskOperation(String type, List devices, Operation operation) throws OperationManagementException; diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/service/DeviceManagementProviderServiceImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/service/DeviceManagementProviderServiceImpl.java index be2cabefb0..e13c6d973f 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/service/DeviceManagementProviderServiceImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/service/DeviceManagementProviderServiceImpl.java @@ -56,6 +56,7 @@ import org.wso2.carbon.device.mgt.common.DeviceManager; import org.wso2.carbon.device.mgt.common.DeviceNotification; import org.wso2.carbon.device.mgt.common.DevicePropertyNotification; import org.wso2.carbon.device.mgt.common.DeviceTransferRequest; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.EnrolmentInfo; import org.wso2.carbon.device.mgt.common.FeatureManager; import org.wso2.carbon.device.mgt.common.InitialOperationConfig; @@ -779,7 +780,7 @@ public class DeviceManagementProviderServiceImpl implements DeviceManagementProv List allocatedDevices; try { DeviceManagementDAOFactory.openConnection(); - allocatedDevices = deviceDAO.getDevices(deviceType, this.getTenantId(), activeServerCount, serverIndex); + allocatedDevices = deviceDAO.getAllocatedDevices(deviceType, this.getTenantId(), activeServerCount, serverIndex); if (allocatedDevices == null) { if (log.isDebugEnabled()) { log.debug("No device is found upon the type '" + deviceType + "'"); @@ -1874,8 +1875,8 @@ public class DeviceManagementProviderServiceImpl implements DeviceManagementProv } @Override - public void addTaskOperation(String type, Operation operation) throws OperationManagementException { - pluginRepository.getOperationManager(type, this.getTenantId()).addTaskOperation(type, operation); + public void addTaskOperation(String type, Operation operation, DynamicTaskContext taskContext) throws OperationManagementException { + pluginRepository.getOperationManager(type, this.getTenantId()).addTaskOperation(type, operation, taskContext); } @Override diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManager.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManager.java index c4db7c3531..77310bbcf1 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManager.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManager.java @@ -19,6 +19,8 @@ package org.wso2.carbon.device.mgt.core.task; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; + public interface DeviceTaskManager { // /** @@ -56,7 +58,7 @@ public interface DeviceTaskManager { * This method will add the operations to devices * @throws DeviceMgtTaskException */ - void addOperations() throws DeviceMgtTaskException; + void addOperations(DynamicTaskContext dynamicTaskContext) throws DeviceMgtTaskException; // /** diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java index b3f4b850a6..b8da8e6124 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.base.MultitenantConstants; import org.wso2.carbon.context.PrivilegedCarbonContext; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException; import org.wso2.carbon.device.mgt.common.OperationMonitoringTaskConfig; import org.wso2.carbon.device.mgt.common.StartupOperationConfig; @@ -52,7 +53,7 @@ import org.wso2.carbon.user.api.UserStoreException; import java.util.List; import java.util.Map; -public class DeviceDetailsRetrieverTask implements Task { +public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask { private static Log log = LogFactory.getLog(DeviceDetailsRetrieverTask.class); private String deviceType; @@ -63,10 +64,6 @@ public class DeviceDetailsRetrieverTask implements Task { deviceType = map.get("DEVICE_TYPE"); } - @Override - public void init() { - } - @Override public void execute() { deviceManagementProviderService = DeviceManagementDataHolder.getInstance() @@ -125,7 +122,7 @@ public class DeviceDetailsRetrieverTask implements Task { //pass the configurations also from here, monitoring tasks try { if (deviceManagementProviderService.isDeviceMonitoringEnabled(deviceType)) { - deviceTaskManager.addOperations(); + deviceTaskManager.addOperations(super.getTaskContext()); } } catch (DeviceMgtTaskException e) { log.error("Error occurred while trying to add the operations to " + @@ -133,4 +130,8 @@ public class DeviceDetailsRetrieverTask implements Task { } } + @Override + protected void setup() { + + } } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceTaskManagerImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceTaskManagerImpl.java index 1b15091cde..90d73320fe 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceTaskManagerImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceTaskManagerImpl.java @@ -44,6 +44,7 @@ import org.wso2.carbon.device.mgt.common.StartupOperationConfig; import org.wso2.carbon.device.mgt.common.exceptions.InvalidDeviceException; import org.wso2.carbon.device.mgt.common.operation.mgt.Operation; import org.wso2.carbon.device.mgt.common.operation.mgt.OperationManagementException; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder; import org.wso2.carbon.device.mgt.core.operation.mgt.CommandOperation; import org.wso2.carbon.device.mgt.core.operation.mgt.ProfileOperation; @@ -115,7 +116,7 @@ public class DeviceTaskManagerImpl implements DeviceTaskManager { @Override - public void addOperations() throws DeviceMgtTaskException { + public void addOperations(DynamicTaskContext dynamicTaskContext) throws DeviceMgtTaskException { DeviceManagementProviderService deviceManagementProviderService = DeviceManagementDataHolder.getInstance(). getDeviceManagementProvider(); //list operations for device type @@ -133,7 +134,7 @@ public class DeviceTaskManagerImpl implements DeviceTaskManager { operation.setType(Operation.Type.COMMAND); operation.setCode(str); try { - deviceManagementProviderService.addTaskOperation(deviceType, operation); + deviceManagementProviderService.addTaskOperation(deviceType, operation, dynamicTaskContext); } catch (OperationManagementException e) { throw new DeviceMgtTaskException("Error occurred while adding task operations to devices", e); } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java index 02cc5315c8..118497aef0 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java @@ -4,6 +4,7 @@ import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementExc import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.device.mgt.common.ServerCtxInfo; +import org.wso2.carbon.device.mgt.common.DynamicTaskContext; import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder; import org.wso2.carbon.ntask.core.Task; @@ -12,32 +13,39 @@ public abstract class DynamicPartitionedScheduleTask implements Task { private static final Log log = LogFactory.getLog(DynamicPartitionedScheduleTask.class); - private static int serverHashIndex; - private static int activeServerCount; + private static DynamicTaskContext taskContext = null; @Override public final void init() { try { ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); if(ctxInfo!=null){ - activeServerCount = ctxInfo.getActiveServerCount(); - serverHashIndex = ctxInfo.getLocalServerHashIdx(); - setup(); + taskContext = new DynamicTaskContext(); + taskContext.setActiveServerCount(ctxInfo.getActiveServerCount()); + taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx()); + + if(ctxInfo.getActiveServerCount() > 0){ + taskContext.setPartitioningEnabled(true); + } + + if(log.isDebugEnabled()){ + log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() + + " where active server count is : " + taskContext.getActiveServerCount() + + " partitioning task enabled : " + taskContext.isPartitioningEnabled()); + } } else { log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function."); } } catch (HeartBeatManagementException e) { log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function." , e); } + setup(); } protected abstract void setup(); - public int getLocalServerHash(){ - return serverHashIndex; + public static DynamicTaskContext getTaskContext() { + return taskContext; } - public int getActiveServerCount(){ - return activeServerCount; - } } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManagerTest.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManagerTest.java index 5ee04dc92d..44554527bc 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManagerTest.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/task/DeviceTaskManagerTest.java @@ -117,7 +117,7 @@ public class DeviceTaskManagerTest extends BaseDeviceManagementTest { @Test(groups = "Device Task Manager Test Group", description = "Testing adding operations to devices.") public void testAddOperation() throws DeviceMgtTaskException, OperationManagementException { log.info("Attempting to add operations for devices."); - this.deviceTaskManager.addOperations(); + this.deviceTaskManager.addOperations(null); for (DeviceIdentifier deviceId : deviceIds) { List operationList = this.operationManager.getOperations(deviceId); Assert.assertNotNull(operationList); @@ -133,7 +133,7 @@ public class DeviceTaskManagerTest extends BaseDeviceManagementTest { new TestDeviceManagementService(NEW_DEVICE_TYPE, TestDataHolder.SUPER_TENANT_DOMAIN)); DeviceTaskManager taskManager = new DeviceTaskManagerImpl(NEW_DEVICE_TYPE, TestDataHolder.generateMonitoringTaskConfig(true, 50000, 3)); - taskManager.addOperations(); + taskManager.addOperations(null); } @Test(groups = "Device Task Manager Test Group", dependsOnMethods = "testAddOperationsWithoutDevices", @@ -141,7 +141,7 @@ public class DeviceTaskManagerTest extends BaseDeviceManagementTest { public void testAddOperationsWithoutOperations() throws DeviceMgtTaskException { DeviceTaskManager taskManager = new DeviceTaskManagerImpl(NEW_DEVICE_TYPE, TestDataHolder.generateMonitoringTaskConfig(true, 50000, 3)); - taskManager.addOperations(); + taskManager.addOperations(null); } @Test(groups = "Device Task Manager Test Group", description = "Testing device detail retriever task execution") diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java index 0acbac6a60..2129f78286 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java @@ -28,7 +28,7 @@ import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException; import org.wso2.carbon.device.mgt.common.policy.mgt.PolicyMonitoringManager; import org.wso2.carbon.device.mgt.common.policy.mgt.monitor.PolicyComplianceException; import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService; -import org.wso2.carbon.ntask.core.Task; +import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask; import org.wso2.carbon.policy.mgt.core.internal.PolicyManagementDataHolder; import org.wso2.carbon.policy.mgt.core.mgt.MonitoringManager; @@ -36,7 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -public class MonitoringTask implements Task { +public class MonitoringTask extends DynamicPartitionedScheduleTask { private static final Log log = LogFactory.getLog(MonitoringTask.class); @@ -44,10 +44,6 @@ public class MonitoringTask implements Task { public void setProperties(Map map) { } - @Override - public void init() { - } - @Override public void execute() { @@ -125,7 +121,14 @@ public class MonitoringTask implements Task { PolicyMonitoringManager monitoringService = PolicyManagementDataHolder.getInstance().getDeviceManagementService() .getPolicyMonitoringManager(deviceType); - List devices = deviceManagementProviderService.getAllDevices(deviceType, false); + List devices; + if(super.getTaskContext()!= null && super.getTaskContext().isPartitioningEnabled()){ + devices = deviceManagementProviderService.getAllocatedDevices(deviceType, + super.getTaskContext().getActiveServerCount(), + super.getTaskContext().getServerHashIndex()); + } else { + devices = deviceManagementProviderService.getAllDevices(deviceType, false); + } if (monitoringService != null && !devices.isEmpty()) { List notifiableDevices = new ArrayList<>(); if (log.isDebugEnabled()) { @@ -163,4 +166,8 @@ public class MonitoringTask implements Task { } } + @Override + protected void setup() { + + } }