Imrpoving task implementation

revert-70ac1926
Ace 4 years ago
parent 3744a7cfc7
commit e7b97d78c7

@ -0,0 +1,32 @@
package org.wso2.carbon.device.mgt.common;
public class DynamicTaskContext {
private int serverHashIndex;
private int activeServerCount;
private boolean partitioningEnabled = false;
public int getServerHashIndex() {
return serverHashIndex;
}
public void setServerHashIndex(int serverHashIndex) {
this.serverHashIndex = serverHashIndex;
}
public int getActiveServerCount() {
return activeServerCount;
}
public void setActiveServerCount(int activeServerCount) {
this.activeServerCount = activeServerCount;
}
public boolean isPartitioningEnabled() {
return partitioningEnabled;
}
public void setPartitioningEnabled(boolean partitioningEnabled) {
this.partitioningEnabled = partitioningEnabled;
}
}

@ -19,6 +19,7 @@ package org.wso2.carbon.device.mgt.common.operation.mgt;
import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException;
import org.wso2.carbon.device.mgt.common.exceptions.InvalidDeviceException;
import org.wso2.carbon.device.mgt.common.PaginationRequest;
@ -47,7 +48,7 @@ public interface OperationManager {
void addTaskOperation(List<Device> devices, Operation operation) throws OperationManagementException;
void addTaskOperation(String deviceType, Operation operation) throws OperationManagementException;
void addTaskOperation(String deviceType, Operation operation, DynamicTaskContext dynamicTaskContext) throws OperationManagementException;
/**
* Method to retrieve the list of all operations to a device.

@ -295,6 +295,18 @@ public interface DeviceDAO {
*/
List<Device> getDevices(PaginationRequest request, int tenantId) throws DeviceManagementDAOException;
/**
* This method is used to retrieve the devices of a given tenant as a paginated result, along the lines of
* activeServerCount and serverIndex
*
* @param request
* @param tenantId
* @param activeServerCount
* @param serverIndex
* @return
*/
List<Device> getAllocatedDevices(PaginationRequest request, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException;
/**
* This method is used to search for devices within a specific group.
*
@ -335,7 +347,7 @@ public interface DeviceDAO {
* @return returns list of devices of provided type.
* @throws DeviceManagementDAOException
*/
List<Device> getDevices(String type, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException;
List<Device> getAllocatedDevices(String type, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException;
List<Device> getDevices(long timestamp, int tenantId) throws DeviceManagementDAOException;

@ -802,7 +802,7 @@ public abstract class AbstractDeviceDAOImpl implements DeviceDAO {
@Override
public List<Device> getDevices(String type, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException {
public List<Device> getAllocatedDevices(String type, int tenantId, int activeServerCount, int serverIndex) throws DeviceManagementDAOException {
Connection conn;
PreparedStatement stmt = null;
ResultSet rs = null;

@ -176,6 +176,141 @@ public class GenericDeviceDAOImpl extends AbstractDeviceDAOImpl {
}
}
@Override
public List<Device> getAllocatedDevices(PaginationRequest request, int tenantId, int activeServerCount, int serverIndex)
throws DeviceManagementDAOException {
List<Device> devices;
String deviceType = request.getDeviceType();
boolean isDeviceTypeProvided = false;
String deviceName = request.getDeviceName();
boolean isDeviceNameProvided = false;
String owner = request.getOwner();
boolean isOwnerProvided = false;
String ownerPattern = request.getOwnerPattern();
boolean isOwnerPatternProvided = false;
String ownership = request.getOwnership();
boolean isOwnershipProvided = false;
List<String> statusList = request.getStatusList();
boolean isStatusProvided = false;
Date since = request.getSince();
boolean isSinceProvided = false;
boolean isPartitionedTask = false;
try {
Connection conn = getConnection();
String sql = "SELECT d1.ID AS DEVICE_ID, " +
"d1.DESCRIPTION, " +
"d1.NAME AS DEVICE_NAME, " +
"d1.DEVICE_TYPE, " +
"d1.DEVICE_IDENTIFICATION, " +
"e.OWNER, " +
"e.OWNERSHIP, " +
"e.STATUS, " +
"e.IS_TRANSFERRED, " +
"e.DATE_OF_LAST_UPDATE, " +
"e.DATE_OF_ENROLMENT, " +
"e.ID AS ENROLMENT_ID " +
"FROM DM_ENROLMENT e, " +
"(SELECT d.ID, " +
"d.DESCRIPTION, " +
"d.NAME, " +
"d.DEVICE_IDENTIFICATION, " +
"t.NAME AS DEVICE_TYPE " +
"FROM DM_DEVICE d, DM_DEVICE_TYPE t ";
//Add the query to filter active devices on timestamp
if (since != null) {
sql = sql + ", DM_DEVICE_DETAIL dt";
isSinceProvided = true;
}
sql = sql + " WHERE DEVICE_TYPE_ID = t.ID AND d.TENANT_ID = ?";
//Add query for last updated timestamp
if (isSinceProvided) {
sql = sql + " AND dt.DEVICE_ID = d.ID AND dt.UPDATE_TIMESTAMP > ?";
}
//Add the query for device-type
if (deviceType != null && !deviceType.isEmpty()) {
sql = sql + " AND t.NAME = ?";
isDeviceTypeProvided = true;
}
//Add the query for device-name
if (deviceName != null && !deviceName.isEmpty()) {
sql = sql + " AND d.NAME LIKE ?";
isDeviceNameProvided = true;
}
sql = sql + ") d1 WHERE d1.ID = e.DEVICE_ID AND TENANT_ID = ?";
//Add the query for ownership
if (ownership != null && !ownership.isEmpty()) {
sql = sql + " AND e.OWNERSHIP = ?";
isOwnershipProvided = true;
}
//Add the query for owner
if (owner != null && !owner.isEmpty()) {
sql = sql + " AND e.OWNER = ?";
isOwnerProvided = true;
} else if (ownerPattern != null && !ownerPattern.isEmpty()) {
sql = sql + " AND e.OWNER LIKE ?";
isOwnerPatternProvided = true;
}
if (statusList != null && !statusList.isEmpty()) {
sql += buildStatusQuery(statusList);
isStatusProvided = true;
}
if (activeServerCount > 0){
sql = sql + " AND MOD(d1.ID, ?) = ?";
isPartitionedTask = true;
}
sql = sql + " LIMIT ?,?";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
int paramIdx = 1;
stmt.setInt(paramIdx++, tenantId);
if (isSinceProvided) {
stmt.setLong(paramIdx++, since.getTime());
}
if (isDeviceTypeProvided) {
stmt.setString(paramIdx++, deviceType);
}
if (isDeviceNameProvided) {
stmt.setString(paramIdx++, deviceName + "%");
}
stmt.setInt(paramIdx++, tenantId);
if (isOwnershipProvided) {
stmt.setString(paramIdx++, ownership);
}
if (isOwnerProvided) {
stmt.setString(paramIdx++, owner);
} else if (isOwnerPatternProvided) {
stmt.setString(paramIdx++, ownerPattern + "%");
}
if (isStatusProvided) {
for (String status : statusList) {
stmt.setString(paramIdx++, status);
}
}
if (isPartitionedTask) {
stmt.setInt(paramIdx++, activeServerCount);
stmt.setInt(paramIdx++, serverIndex);
}
stmt.setInt(paramIdx++, request.getStartIndex());
stmt.setInt(paramIdx, request.getRowCount());
try (ResultSet rs = stmt.executeQuery()) {
devices = new ArrayList<>();
while (rs.next()) {
Device device = DeviceManagementDAOUtil.loadDevice(rs);
devices.add(device);
}
return devices;
}
}
} catch (SQLException e) {
String msg = "Error occurred while retrieving information of all " +
"registered devices";
log.error(msg, e);
throw new DeviceManagementDAOException(msg, e);
}
}
@Override
public List<Device> searchDevicesInGroup(PaginationRequest request, int tenantId)
throws DeviceManagementDAOException {

@ -176,6 +176,144 @@ public class OracleDeviceDAOImpl extends AbstractDeviceDAOImpl {
}
}
@Override
public List<Device> getAllocatedDevices(PaginationRequest request, int tenantId,
int activeServerCount, int serverIndex)
throws DeviceManagementDAOException {
Connection conn;
List<Device> devices = null;
String deviceType = request.getDeviceType();
boolean isDeviceTypeProvided = false;
String deviceName = request.getDeviceName();
boolean isDeviceNameProvided = false;
String owner = request.getOwner();
boolean isOwnerProvided = false;
String ownerPattern = request.getOwnerPattern();
boolean isOwnerPatternProvided = false;
String ownership = request.getOwnership();
boolean isOwnershipProvided = false;
List<String> statusList = request.getStatusList();
boolean isStatusProvided = false;
Date since = request.getSince();
boolean isSinceProvided = false;
boolean isPartitionedTask = false;
try {
conn = getConnection();
String sql = "SELECT d1.ID AS DEVICE_ID, " +
"d1.DESCRIPTION, " +
"d1.NAME AS DEVICE_NAME, " +
"d1.DEVICE_TYPE, " +
"d1.DEVICE_IDENTIFICATION, " +
"e.OWNER, " +
"e.OWNERSHIP, " +
"e.STATUS, " +
"e.IS_TRANSFERRED, " +
"e.DATE_OF_LAST_UPDATE, " +
"e.DATE_OF_ENROLMENT, " +
"e.ID AS ENROLMENT_ID " +
"FROM DM_ENROLMENT e, " +
"(SELECT d.ID, " +
"d.DESCRIPTION, " +
"d.NAME, " +
"d.DEVICE_IDENTIFICATION, " +
"t.NAME AS DEVICE_TYPE " +
"FROM DM_DEVICE d, " +
"DM_DEVICE_TYPE t ";
//Add the query to filter active devices on timestamp
if (since != null) {
sql = sql + ", DM_DEVICE_DETAIL dt";
isSinceProvided = true;
}
sql = sql + " WHERE DEVICE_TYPE_ID = t.ID AND d.TENANT_ID = ?";
//Add query for last updated timestamp
if (isSinceProvided) {
sql = sql + " AND dt.DEVICE_ID = d.ID AND dt.UPDATE_TIMESTAMP > ?";
}
//Add the query for device-type
if (deviceType != null && !deviceType.isEmpty()) {
sql = sql + " AND t.NAME = ?";
isDeviceTypeProvided = true;
}
//Add the query for device-name
if (deviceName != null && !deviceName.isEmpty()) {
sql = sql + " AND d.NAME LIKE ?";
isDeviceNameProvided = true;
}
sql = sql + ") d1 WHERE d1.ID = e.DEVICE_ID AND TENANT_ID = ?";
//Add the query for ownership
if (ownership != null && !ownership.isEmpty()) {
sql = sql + " AND e.OWNERSHIP = ?";
isOwnershipProvided = true;
}
//Add the query for owner
if (owner != null && !owner.isEmpty()) {
sql = sql + " AND e.OWNER = ?";
isOwnerProvided = true;
} else if (ownerPattern != null && !ownerPattern.isEmpty()) {
sql = sql + " AND e.OWNER LIKE ?";
isOwnerPatternProvided = true;
}
if (statusList != null && !statusList.isEmpty()) {
sql += buildStatusQuery(statusList);
isStatusProvided = true;
}
if (activeServerCount > 0){
sql = sql + " AND MOD(d1.ID, ?) = ?";
isPartitionedTask = true;
}
sql = sql + " ORDER BY ENROLMENT_ID OFFSET ? ROWS FETCH NEXT ? ROWS ONLY";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
int paramIdx = 1;
stmt.setInt(paramIdx++, tenantId);
if (isSinceProvided) {
stmt.setLong(paramIdx++, since.getTime());
}
if (isDeviceTypeProvided) {
stmt.setString(paramIdx++, deviceType);
}
if (isDeviceNameProvided) {
stmt.setString(paramIdx++, deviceName + "%");
}
stmt.setInt(paramIdx++, tenantId);
if (isOwnershipProvided) {
stmt.setString(paramIdx++, ownership);
}
if (isOwnerProvided) {
stmt.setString(paramIdx++, owner);
} else if (isOwnerPatternProvided) {
stmt.setString(paramIdx++, ownerPattern + "%");
}
if (isStatusProvided) {
for (String status : statusList) {
stmt.setString(paramIdx++, status);
}
}
if (isPartitionedTask) {
stmt.setInt(paramIdx++, activeServerCount);
stmt.setInt(paramIdx++, serverIndex);
}
stmt.setInt(paramIdx++, request.getStartIndex());
stmt.setInt(paramIdx, request.getRowCount());
try (ResultSet rs = stmt.executeQuery()) {
devices = new ArrayList<>();
while (rs.next()) {
Device device = DeviceManagementDAOUtil.loadDevice(rs);
devices.add(device);
}
return devices;
}
}
} catch (SQLException e) {
String msg = "Error occurred while retrieving information of all " +
"registered devices";
log.error(msg, e);
throw new DeviceManagementDAOException(msg, e);
}
}
@Override
public List<Device> searchDevicesInGroup(PaginationRequest request, int tenantId)
throws DeviceManagementDAOException {

@ -164,6 +164,132 @@ public class PostgreSQLDeviceDAOImpl extends AbstractDeviceDAOImpl {
}
}
public List<Device> getAllocatedDevices(PaginationRequest request, int tenantId,
int activeServerCount, int serverIndex)
throws DeviceManagementDAOException {
Connection conn;
List<Device> devices = null;
String deviceType = request.getDeviceType();
boolean isDeviceTypeProvided = false;
String deviceName = request.getDeviceName();
boolean isDeviceNameProvided = false;
String owner = request.getOwner();
boolean isOwnerProvided = false;
String ownerPattern = request.getOwnerPattern();
boolean isOwnerPatternProvided = false;
String ownership = request.getOwnership();
boolean isOwnershipProvided = false;
List<String> statusList = request.getStatusList();
boolean isStatusProvided = false;
Date since = request.getSince();
boolean isSinceProvided = false;
boolean isPartitionedTask = false;
try {
conn = getConnection();
String sql = "SELECT d1.ID AS DEVICE_ID, " +
"d1.DESCRIPTION, " +
"d1.NAME AS DEVICE_NAME, " +
"d1.DEVICE_TYPE, " +
"d1.DEVICE_IDENTIFICATION, " +
"e.OWNER, " +
"e.OWNERSHIP, " +
"e.STATUS, " +
"e.IS_TRANSFERRED, " +
"e.DATE_OF_LAST_UPDATE, " +
"e.DATE_OF_ENROLMENT, " +
"e.ID AS ENROLMENT_ID " +
"FROM DM_ENROLMENT e, " +
"(SELECT d.ID, " +
"d.DESCRIPTION, " +
"d.NAME, " +
"d.DEVICE_IDENTIFICATION, " +
"t.NAME AS DEVICE_TYPE " +
"FROM DM_DEVICE d, " +
"DM_DEVICE_TYPE t " +
"WHERE DEVICE_TYPE_ID = t.ID " +
"AND d.TENANT_ID = ?";
//Add the query for device-type
if (deviceType != null && !deviceType.isEmpty()) {
sql = sql + " AND t.NAME = ?";
isDeviceTypeProvided = true;
}
//Add the query for device-name
if (deviceName != null && !deviceName.isEmpty()) {
sql = sql + " AND d.NAME LIKE ?";
isDeviceNameProvided = true;
}
sql = sql + ") d1 WHERE d1.ID = e.DEVICE_ID AND TENANT_ID = ?";
//Add the query for ownership
if (ownership != null && !ownership.isEmpty()) {
sql = sql + " AND e.OWNERSHIP = ?";
isOwnershipProvided = true;
}
//Add the query for owner
if (owner != null && !owner.isEmpty()) {
sql = sql + " AND e.OWNER = ?";
isOwnerProvided = true;
} else if (ownerPattern != null && !ownerPattern.isEmpty()) {
sql = sql + " AND e.OWNER LIKE ?";
isOwnerPatternProvided = true;
}
if (statusList != null && !statusList.isEmpty()) {
sql += buildStatusQuery(statusList);
isStatusProvided = true;
}
if (activeServerCount > 0){
sql = sql + " AND MOD(d1.ID, ?) = ?";
isPartitionedTask = true;
}
sql = sql + " LIMIT ? OFFSET ?";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
int paramIdx = 1;
stmt.setInt(paramIdx++, tenantId);
if (isDeviceTypeProvided) {
stmt.setString(paramIdx++, deviceType);
}
if (isDeviceNameProvided) {
stmt.setString(paramIdx++, deviceName + "%");
}
stmt.setInt(paramIdx++, tenantId);
if (isOwnershipProvided) {
stmt.setString(paramIdx++, ownership);
}
if (isOwnerProvided) {
stmt.setString(paramIdx++, owner);
} else if (isOwnerPatternProvided) {
stmt.setString(paramIdx++, ownerPattern + "%");
}
if (isStatusProvided) {
for (String status : statusList) {
stmt.setString(paramIdx++, status);
}
}
if (isPartitionedTask) {
stmt.setInt(paramIdx++, activeServerCount);
stmt.setInt(paramIdx++, serverIndex);
}
stmt.setInt(paramIdx++, request.getRowCount());
stmt.setInt(paramIdx, request.getStartIndex());
try (ResultSet rs = stmt.executeQuery()) {
devices = new ArrayList<>();
while (rs.next()) {
Device device = DeviceManagementDAOUtil.loadDevice(rs);
devices.add(device);
}
return devices;
}
}
} catch (SQLException e) {
String msg = "Error occurred while retrieving information of all " +
"registered devices";
log.error(msg, e);
throw new DeviceManagementDAOException(msg, e);
}
}
@Override
public List<Device> searchDevicesInGroup(PaginationRequest request, int tenantId)
throws DeviceManagementDAOException {

@ -176,6 +176,143 @@ public class SQLServerDeviceDAOImpl extends AbstractDeviceDAOImpl {
}
}
@Override
public List<Device> getAllocatedDevices(PaginationRequest request, int tenantId,
int activeServerCount, int serverIndex)
throws DeviceManagementDAOException {
Connection conn;
List<Device> devices = null;
String deviceType = request.getDeviceType();
boolean isDeviceTypeProvided = false;
String deviceName = request.getDeviceName();
boolean isDeviceNameProvided = false;
String owner = request.getOwner();
boolean isOwnerProvided = false;
String ownerPattern = request.getOwnerPattern();
boolean isOwnerPatternProvided = false;
String ownership = request.getOwnership();
boolean isOwnershipProvided = false;
List<String> statusList = request.getStatusList();
boolean isStatusProvided = false;
Date since = request.getSince();
boolean isSinceProvided = false;
boolean isPartitionedTask = false;
try {
conn = getConnection();
String sql = "SELECT d1.ID AS DEVICE_ID, " +
"d1.DESCRIPTION, " +
"d1.NAME AS DEVICE_NAME, " +
"d1.DEVICE_TYPE, " +
"d1.DEVICE_IDENTIFICATION, " +
"e.OWNER, " +
"e.OWNERSHIP, " +
"e.STATUS, " +
"e.IS_TRANSFERRED, " +
"e.DATE_OF_LAST_UPDATE, " +
"e.DATE_OF_ENROLMENT, " +
"e.ID AS ENROLMENT_ID " +
"FROM DM_ENROLMENT e, " +
"(SELECT d.ID, " +
"d.DESCRIPTION, " +
"d.NAME, " +
"d.DEVICE_IDENTIFICATION, " +
"t.NAME AS DEVICE_TYPE " +
"FROM DM_DEVICE d, DM_DEVICE_TYPE t ";
//Add the query to filter active devices on timestamp
if (since != null) {
sql = sql + ", DM_DEVICE_DETAIL dt";
isSinceProvided = true;
}
sql = sql + " WHERE DEVICE_TYPE_ID = t.ID AND d.TENANT_ID = ?";
//Add query for last updated timestamp
if (isSinceProvided) {
sql = sql + " AND dt.DEVICE_ID = d.ID AND dt.UPDATE_TIMESTAMP > ?";
}
//Add the query for device-type
if (deviceType != null && !deviceType.isEmpty()) {
sql = sql + " AND t.NAME = ?";
isDeviceTypeProvided = true;
}
//Add the query for device-name
if (deviceName != null && !deviceName.isEmpty()) {
sql = sql + " AND d.NAME LIKE ?";
isDeviceNameProvided = true;
}
sql = sql + ") d1 WHERE d1.ID = e.DEVICE_ID AND TENANT_ID = ?";
//Add the query for ownership
if (ownership != null && !ownership.isEmpty()) {
sql = sql + " AND e.OWNERSHIP = ?";
isOwnershipProvided = true;
}
//Add the query for owner
if (owner != null && !owner.isEmpty()) {
sql = sql + " AND e.OWNER = ?";
isOwnerProvided = true;
} else if (ownerPattern != null && !ownerPattern.isEmpty()) {
sql = sql + " AND e.OWNER LIKE ?";
isOwnerPatternProvided = true;
}
if (statusList != null && !statusList.isEmpty()) {
sql += buildStatusQuery(statusList);
isStatusProvided = true;
}
if (activeServerCount > 0){
sql = sql + " AND d1.ID % ? = ?";
isPartitionedTask = true;
}
sql = sql + " ORDER BY ENROLMENT_ID OFFSET ? ROWS FETCH NEXT ? ROWS ONLY";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
int paramIdx = 1;
stmt.setInt(paramIdx++, tenantId);
if (isSinceProvided) {
stmt.setLong(paramIdx++, since.getTime());
}
if (isDeviceTypeProvided) {
stmt.setString(paramIdx++, deviceType);
}
if (isDeviceNameProvided) {
stmt.setString(paramIdx++, deviceName + "%");
}
stmt.setInt(paramIdx++, tenantId);
if (isOwnershipProvided) {
stmt.setString(paramIdx++, ownership);
}
if (isOwnerProvided) {
stmt.setString(paramIdx++, owner);
} else if (isOwnerPatternProvided) {
stmt.setString(paramIdx++, ownerPattern + "%");
}
if (isStatusProvided) {
for (String status : statusList) {
stmt.setString(paramIdx++, status);
}
}
if (isPartitionedTask) {
stmt.setInt(paramIdx++, activeServerCount);
stmt.setInt(paramIdx++, serverIndex);
}
stmt.setInt(paramIdx++, request.getStartIndex());
stmt.setInt(paramIdx, request.getRowCount());
try (ResultSet rs = stmt.executeQuery()) {
devices = new ArrayList<>();
while (rs.next()) {
Device device = DeviceManagementDAOUtil.loadDevice(rs);
devices.add(device);
}
return devices;
}
}
} catch (SQLException e) {
String msg = "Error occurred while retrieving information of all " +
"registered devices";
log.error(msg, e);
throw new DeviceManagementDAOException(msg, e);
}
}
@Override
public List<Device> searchDevicesInGroup(PaginationRequest request, int tenantId)
throws DeviceManagementDAOException {

@ -24,6 +24,7 @@ import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.EnrolmentInfo;
import org.wso2.carbon.device.mgt.common.MonitoringOperation;
import org.wso2.carbon.device.mgt.common.OperationMonitoringTaskConfig;
@ -300,7 +301,7 @@ public class OperationManagerImpl implements OperationManager {
}
@Override
public void addTaskOperation(String deviceType, Operation operation) throws OperationManagementException {
public void addTaskOperation(String deviceType, Operation operation, DynamicTaskContext dynamicTaskContext) throws OperationManagementException {
List<String> validStatuses = Arrays.asList(EnrolmentInfo.Status.ACTIVE.toString(),
EnrolmentInfo.Status.INACTIVE.toString(),
EnrolmentInfo.Status.UNREACHABLE.toString());
@ -316,7 +317,16 @@ public class OperationManagerImpl implements OperationManager {
paginationRequest = new PaginationRequest(start, batchSize);
paginationRequest.setStatusList(validStatuses);
paginationRequest.setDeviceType(deviceType);
List<Device> devices = deviceDAO.getDevices(paginationRequest, tenantId);
List<Device> devices;
if(dynamicTaskContext != null && dynamicTaskContext.isPartitioningEnabled()) {
devices = deviceDAO.getAllocatedDevices(paginationRequest, tenantId,
dynamicTaskContext.getActiveServerCount(),
dynamicTaskContext.getServerHashIndex());
} else {
devices = deviceDAO.getDevices(paginationRequest, tenantId);
}
if (devices.size() == batchSize) {
hasRecords = true;
start += batchSize;

@ -37,6 +37,7 @@ package org.wso2.carbon.device.mgt.core.service;
import org.apache.commons.collections.map.SingletonMap;
import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.EnrolmentInfo;
import org.wso2.carbon.device.mgt.common.FeatureManager;
import org.wso2.carbon.device.mgt.common.PaginationRequest;
@ -664,7 +665,7 @@ public interface DeviceManagementProviderService {
Activity addOperation(String type, Operation operation,
List<DeviceIdentifier> devices) throws OperationManagementException, InvalidDeviceException;
void addTaskOperation(String deviceType, Operation operation) throws OperationManagementException;
void addTaskOperation(String deviceType, Operation operation, DynamicTaskContext taskContext) throws OperationManagementException;
void addTaskOperation(String type, List<Device> devices, Operation operation)
throws OperationManagementException;

@ -56,6 +56,7 @@ import org.wso2.carbon.device.mgt.common.DeviceManager;
import org.wso2.carbon.device.mgt.common.DeviceNotification;
import org.wso2.carbon.device.mgt.common.DevicePropertyNotification;
import org.wso2.carbon.device.mgt.common.DeviceTransferRequest;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.EnrolmentInfo;
import org.wso2.carbon.device.mgt.common.FeatureManager;
import org.wso2.carbon.device.mgt.common.InitialOperationConfig;
@ -779,7 +780,7 @@ public class DeviceManagementProviderServiceImpl implements DeviceManagementProv
List<Device> allocatedDevices;
try {
DeviceManagementDAOFactory.openConnection();
allocatedDevices = deviceDAO.getDevices(deviceType, this.getTenantId(), activeServerCount, serverIndex);
allocatedDevices = deviceDAO.getAllocatedDevices(deviceType, this.getTenantId(), activeServerCount, serverIndex);
if (allocatedDevices == null) {
if (log.isDebugEnabled()) {
log.debug("No device is found upon the type '" + deviceType + "'");
@ -1874,8 +1875,8 @@ public class DeviceManagementProviderServiceImpl implements DeviceManagementProv
}
@Override
public void addTaskOperation(String type, Operation operation) throws OperationManagementException {
pluginRepository.getOperationManager(type, this.getTenantId()).addTaskOperation(type, operation);
public void addTaskOperation(String type, Operation operation, DynamicTaskContext taskContext) throws OperationManagementException {
pluginRepository.getOperationManager(type, this.getTenantId()).addTaskOperation(type, operation, taskContext);
}
@Override

@ -19,6 +19,8 @@
package org.wso2.carbon.device.mgt.core.task;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
public interface DeviceTaskManager {
// /**
@ -56,7 +58,7 @@ public interface DeviceTaskManager {
* This method will add the operations to devices
* @throws DeviceMgtTaskException
*/
void addOperations() throws DeviceMgtTaskException;
void addOperations(DynamicTaskContext dynamicTaskContext) throws DeviceMgtTaskException;
// /**

@ -39,6 +39,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.base.MultitenantConstants;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException;
import org.wso2.carbon.device.mgt.common.OperationMonitoringTaskConfig;
import org.wso2.carbon.device.mgt.common.StartupOperationConfig;
@ -52,7 +53,7 @@ import org.wso2.carbon.user.api.UserStoreException;
import java.util.List;
import java.util.Map;
public class DeviceDetailsRetrieverTask implements Task {
public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask {
private static Log log = LogFactory.getLog(DeviceDetailsRetrieverTask.class);
private String deviceType;
@ -63,10 +64,6 @@ public class DeviceDetailsRetrieverTask implements Task {
deviceType = map.get("DEVICE_TYPE");
}
@Override
public void init() {
}
@Override
public void execute() {
deviceManagementProviderService = DeviceManagementDataHolder.getInstance()
@ -125,7 +122,7 @@ public class DeviceDetailsRetrieverTask implements Task {
//pass the configurations also from here, monitoring tasks
try {
if (deviceManagementProviderService.isDeviceMonitoringEnabled(deviceType)) {
deviceTaskManager.addOperations();
deviceTaskManager.addOperations(super.getTaskContext());
}
} catch (DeviceMgtTaskException e) {
log.error("Error occurred while trying to add the operations to " +
@ -133,4 +130,8 @@ public class DeviceDetailsRetrieverTask implements Task {
}
}
@Override
protected void setup() {
}
}

@ -44,6 +44,7 @@ import org.wso2.carbon.device.mgt.common.StartupOperationConfig;
import org.wso2.carbon.device.mgt.common.exceptions.InvalidDeviceException;
import org.wso2.carbon.device.mgt.common.operation.mgt.Operation;
import org.wso2.carbon.device.mgt.common.operation.mgt.OperationManagementException;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder;
import org.wso2.carbon.device.mgt.core.operation.mgt.CommandOperation;
import org.wso2.carbon.device.mgt.core.operation.mgt.ProfileOperation;
@ -115,7 +116,7 @@ public class DeviceTaskManagerImpl implements DeviceTaskManager {
@Override
public void addOperations() throws DeviceMgtTaskException {
public void addOperations(DynamicTaskContext dynamicTaskContext) throws DeviceMgtTaskException {
DeviceManagementProviderService deviceManagementProviderService = DeviceManagementDataHolder.getInstance().
getDeviceManagementProvider();
//list operations for device type
@ -133,7 +134,7 @@ public class DeviceTaskManagerImpl implements DeviceTaskManager {
operation.setType(Operation.Type.COMMAND);
operation.setCode(str);
try {
deviceManagementProviderService.addTaskOperation(deviceType, operation);
deviceManagementProviderService.addTaskOperation(deviceType, operation, dynamicTaskContext);
} catch (OperationManagementException e) {
throw new DeviceMgtTaskException("Error occurred while adding task operations to devices", e);
}

@ -4,6 +4,7 @@ import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementExc
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder;
import org.wso2.carbon.ntask.core.Task;
@ -12,32 +13,39 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
private static final Log log = LogFactory.getLog(DynamicPartitionedScheduleTask.class);
private static int serverHashIndex;
private static int activeServerCount;
private static DynamicTaskContext taskContext = null;
@Override
public final void init() {
try {
ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
if(ctxInfo!=null){
activeServerCount = ctxInfo.getActiveServerCount();
serverHashIndex = ctxInfo.getLocalServerHashIdx();
setup();
taskContext = new DynamicTaskContext();
taskContext.setActiveServerCount(ctxInfo.getActiveServerCount());
taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx());
if(ctxInfo.getActiveServerCount() > 0){
taskContext.setPartitioningEnabled(true);
}
if(log.isDebugEnabled()){
log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() +
" where active server count is : " + taskContext.getActiveServerCount() +
" partitioning task enabled : " + taskContext.isPartitioningEnabled());
}
} else {
log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.");
}
} catch (HeartBeatManagementException e) {
log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function." , e);
}
setup();
}
protected abstract void setup();
public int getLocalServerHash(){
return serverHashIndex;
public static DynamicTaskContext getTaskContext() {
return taskContext;
}
public int getActiveServerCount(){
return activeServerCount;
}
}

@ -117,7 +117,7 @@ public class DeviceTaskManagerTest extends BaseDeviceManagementTest {
@Test(groups = "Device Task Manager Test Group", description = "Testing adding operations to devices.")
public void testAddOperation() throws DeviceMgtTaskException, OperationManagementException {
log.info("Attempting to add operations for devices.");
this.deviceTaskManager.addOperations();
this.deviceTaskManager.addOperations(null);
for (DeviceIdentifier deviceId : deviceIds) {
List<? extends Operation> operationList = this.operationManager.getOperations(deviceId);
Assert.assertNotNull(operationList);
@ -133,7 +133,7 @@ public class DeviceTaskManagerTest extends BaseDeviceManagementTest {
new TestDeviceManagementService(NEW_DEVICE_TYPE, TestDataHolder.SUPER_TENANT_DOMAIN));
DeviceTaskManager taskManager = new DeviceTaskManagerImpl(NEW_DEVICE_TYPE,
TestDataHolder.generateMonitoringTaskConfig(true, 50000, 3));
taskManager.addOperations();
taskManager.addOperations(null);
}
@Test(groups = "Device Task Manager Test Group", dependsOnMethods = "testAddOperationsWithoutDevices",
@ -141,7 +141,7 @@ public class DeviceTaskManagerTest extends BaseDeviceManagementTest {
public void testAddOperationsWithoutOperations() throws DeviceMgtTaskException {
DeviceTaskManager taskManager = new DeviceTaskManagerImpl(NEW_DEVICE_TYPE,
TestDataHolder.generateMonitoringTaskConfig(true, 50000, 3));
taskManager.addOperations();
taskManager.addOperations(null);
}
@Test(groups = "Device Task Manager Test Group", description = "Testing device detail retriever task execution")

@ -28,7 +28,7 @@ import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException;
import org.wso2.carbon.device.mgt.common.policy.mgt.PolicyMonitoringManager;
import org.wso2.carbon.device.mgt.common.policy.mgt.monitor.PolicyComplianceException;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.ntask.core.Task;
import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask;
import org.wso2.carbon.policy.mgt.core.internal.PolicyManagementDataHolder;
import org.wso2.carbon.policy.mgt.core.mgt.MonitoringManager;
@ -36,7 +36,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MonitoringTask implements Task {
public class MonitoringTask extends DynamicPartitionedScheduleTask {
private static final Log log = LogFactory.getLog(MonitoringTask.class);
@ -44,10 +44,6 @@ public class MonitoringTask implements Task {
public void setProperties(Map<String, String> map) {
}
@Override
public void init() {
}
@Override
public void execute() {
@ -125,7 +121,14 @@ public class MonitoringTask implements Task {
PolicyMonitoringManager monitoringService =
PolicyManagementDataHolder.getInstance().getDeviceManagementService()
.getPolicyMonitoringManager(deviceType);
List<Device> devices = deviceManagementProviderService.getAllDevices(deviceType, false);
List<Device> devices;
if(super.getTaskContext()!= null && super.getTaskContext().isPartitioningEnabled()){
devices = deviceManagementProviderService.getAllocatedDevices(deviceType,
super.getTaskContext().getActiveServerCount(),
super.getTaskContext().getServerHashIndex());
} else {
devices = deviceManagementProviderService.getAllDevices(deviceType, false);
}
if (monitoringService != null && !devices.isEmpty()) {
List<Device> notifiableDevices = new ArrayList<>();
if (log.isDebugEnabled()) {
@ -163,4 +166,8 @@ public class MonitoringTask implements Task {
}
}
@Override
protected void setup() {
}
}

Loading…
Cancel
Save