Fix issues with handling OPERATION_DETAILS and OPERATION_PROPERTIES

This is to fix incorrectly handled operation details retrieval and operation properties handling.

Need following migration for existing data.

```
ALTER TABLE `DM_DB`.`DM_OPERATION`
ADD COLUMN `OPERATION_PROPERTIES` BLOB NULL DEFAULT NULL AFTER `OPERATION_DETAILS`;

ALTER TABLE `ARCHIVAL_DB`.`DM_OPERATION_ARCH`
ADD COLUMN `OPERATION_PROPERTIES` BLOB NULL DEFAULT NULL AFTER `OPERATION_DETAILS`;

UPDATE DM_OPERATION SET OPERATION_DETAILS = null;

UPDATE DM_ENROLMENT_OP_MAPPING SET STATUS = 'ERROR' WHERE STATUS = 'PENDING' OR STATUS = 'IN_PROGRESS' OR STATUS = 'NOTNOW';
```

Reviewed-on: community/device-mgt-core#371
Co-authored-by: Charitha Goonetilleke <charitha@entgra.io>
Co-committed-by: Charitha Goonetilleke <charitha@entgra.io>
remotes/1717824210486943042/master
Charitha Goonetilleke 9 months ago committed by Lasantha Dharmakeerthi
parent 64189a400d
commit 62722718f2

@ -116,6 +116,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION
OPERATION_CODE VARCHAR(1000) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
PRIMARY KEY (ID)
);

@ -140,6 +140,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION (
OPERATION_CODE VARCHAR(1000) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TENANT_ID INTEGER NOT NULL,
PRIMARY KEY (ID)

@ -547,8 +547,9 @@ public class ArchivalDAOImpl implements ArchivalDAO {
String sql = "INSERT INTO " + DESTINATION_DB + ".DM_OPERATION_ARCH " +
"SELECT OPR.ID, OPR.TYPE, OPR.CREATED_TIMESTAMP, OPR.RECEIVED_TIMESTAMP, " +
"OPR.OPERATION_CODE, OPR.INITIATED_BY, OPR.OPERATION_DETAILS, OPR.ENABLED, NOW() " +
"FROM " + SOURCE_DB + ".DM_OPERATION OPR " +
"OPR.OPERATION_CODE, OPR.INITIATED_BY, OPR.OPERATION_DETAILS, OPR.OPERATION_PROPERTIES, " +
"OPR.ENABLED, NOW() " +
"FROM " + SOURCE_DB + ".DM_OPERATION OPR " +
"WHERE OPR.ID NOT IN (SELECT DISTINCT OPERATION_ID FROM " + SOURCE_DB + ".DM_ENROLMENT_OP_MAPPING)";
long startTime = System.currentTimeMillis();

@ -222,7 +222,7 @@ public class OperationManagerImpl implements OperationManager {
if (operationDto.getControl()
== io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT) {
Map<Integer, Integer> pendingOperationIDs = operationDAO
.getExistingOperationIDs(enrolments.keySet().toArray(new Integer[0]), operationCode);
.getExistingNotExecutedOperationIDs(enrolments.keySet().toArray(new Integer[0]), operationCode);
for (Integer enrolmentId : pendingOperationIDs.keySet()) {
operation.setId(pendingOperationIDs.get(enrolmentId));
device = enrolments.get(enrolmentId);
@ -294,7 +294,7 @@ public class OperationManagerImpl implements OperationManager {
if (operationDto.getControl() ==
io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT) {
Map<Integer, Integer> pendingOperationIDs = operationDAO
.getExistingOperationIDs(enrolments.keySet().toArray(new Integer[0]), operationCode);
.getExistingNotExecutedOperationIDs(enrolments.keySet().toArray(new Integer[0]), operationCode);
Device device;
for (Integer enrolmentId : pendingOperationIDs.keySet()) {
operation.setId(pendingOperationIDs.get(enrolmentId));
@ -368,7 +368,7 @@ public class OperationManagerImpl implements OperationManager {
if (operationDto.getControl() ==
io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT) {
Map<Integer, Integer> pendingOperationIDs = operationDAO
.getExistingOperationIDs(enrolments.keySet().toArray(new Integer[0]), operationCode);
.getExistingNotExecutedOperationIDs(enrolments.keySet().toArray(new Integer[0]), operationCode);
Device device;
for (Integer enrolmentId : pendingOperationIDs.keySet()) {
operation.setId(pendingOperationIDs.get(enrolmentId));
@ -841,23 +841,23 @@ public class OperationManagerImpl implements OperationManager {
}
if (dtoOperation != null) {
if (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.COMMAND.equals(dtoOperation.getType()
)) {
io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation commandOperation;
commandOperation =
(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation) commandOperationDAO.
getOperation(dtoOperation.getId());
dtoOperation.setEnabled(commandOperation.isEnabled());
} else if (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.CONFIG.equals(dtoOperation.
getType())) {
dtoOperation = configOperationDAO.getOperation(dtoOperation.getId());
} else if (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.PROFILE.equals(dtoOperation.
getType())) {
dtoOperation = profileOperationDAO.getOperation(dtoOperation.getId());
} else if (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.POLICY.equals(dtoOperation.
getType())) {
dtoOperation = policyOperationDAO.getOperation(dtoOperation.getId());
}
// if (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.COMMAND.equals(dtoOperation.getType()
// )) {
// io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation commandOperation;
// commandOperation =
// (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation) commandOperationDAO.
// getOperation(dtoOperation.getId());
// dtoOperation.setEnabled(commandOperation.isEnabled());
// } else if (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.CONFIG.equals(dtoOperation.
// getType())) {
// dtoOperation = configOperationDAO.getOperation(dtoOperation.getId());
// } else if (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.PROFILE.equals(dtoOperation.
// getType())) {
// dtoOperation = profileOperationDAO.getOperation(dtoOperation.getId());
// } else if (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.POLICY.equals(dtoOperation.
// getType())) {
// dtoOperation = policyOperationDAO.getOperation(dtoOperation.getId());
// }
operation = OperationDAOUtil.convertOperation(dtoOperation);
}
} catch (OperationManagementDAOException e) {
@ -1050,29 +1050,29 @@ public class OperationManagerImpl implements OperationManager {
io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation deviceSpecificOperation = operationDAO.
getOperationByDeviceAndId(enrolmentInfo.getId(),
operationId);
io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation dtoOperation = deviceSpecificOperation;
if (deviceSpecificOperation.getType().
equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.COMMAND)) {
io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation commandOperation;
commandOperation =
(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation) commandOperationDAO.
getOperation(deviceSpecificOperation.getId());
dtoOperation.setEnabled(commandOperation.isEnabled());
} else if (deviceSpecificOperation.getType().
equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.CONFIG)) {
dtoOperation = configOperationDAO.getOperation(deviceSpecificOperation.getId());
} else if (deviceSpecificOperation.getType().equals(
io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.PROFILE)) {
dtoOperation = profileOperationDAO.getOperation(deviceSpecificOperation.getId());
} else if (deviceSpecificOperation.getType().equals(
io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.POLICY)) {
dtoOperation = policyOperationDAO.getOperation(deviceSpecificOperation.getId());
}
if (dtoOperation == null) {
throw new OperationManagementException("Operation not found for operation Id:" + operationId +
" device id:" + deviceId.getId());
}
dtoOperation.setStatus(deviceSpecificOperation.getStatus());
// io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation dtoOperation = deviceSpecificOperation;
// if (deviceSpecificOperation.getType().
// equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.COMMAND)) {
// io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation commandOperation;
// commandOperation =
// (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation) commandOperationDAO.
// getOperation(deviceSpecificOperation.getId());
// dtoOperation.setEnabled(commandOperation.isEnabled());
// } else if (deviceSpecificOperation.getType().
// equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.CONFIG)) {
// dtoOperation = configOperationDAO.getOperation(deviceSpecificOperation.getId());
// } else if (deviceSpecificOperation.getType().equals(
// io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.PROFILE)) {
// dtoOperation = profileOperationDAO.getOperation(deviceSpecificOperation.getId());
// } else if (deviceSpecificOperation.getType().equals(
// io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.POLICY)) {
// dtoOperation = policyOperationDAO.getOperation(deviceSpecificOperation.getId());
// }
// if (dtoOperation == null) {
// throw new OperationManagementException("Operation not found for operation Id:" + operationId +
// " device id:" + deviceId.getId());
// }
// dtoOperation.setStatus(deviceSpecificOperation.getStatus());
operation = OperationDAOUtil.convertOperation(deviceSpecificOperation);
} catch (OperationManagementDAOException e) {
throw new OperationManagementException("Error occurred while retrieving the list of " +
@ -1155,23 +1155,23 @@ public class OperationManagerImpl implements OperationManager {
throw new OperationManagementException("Operation not found for given Id:" + operationId);
}
if (dtoOperation.getType()
.equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.COMMAND)) {
io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation commandOperation;
commandOperation =
(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation) commandOperationDAO.
getOperation(dtoOperation.getId());
dtoOperation.setEnabled(commandOperation.isEnabled());
} else if (dtoOperation.getType().
equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.CONFIG)) {
dtoOperation = configOperationDAO.getOperation(dtoOperation.getId());
} else if (dtoOperation.getType().equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.
PROFILE)) {
dtoOperation = profileOperationDAO.getOperation(dtoOperation.getId());
} else if (dtoOperation.getType().equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.
POLICY)) {
dtoOperation = policyOperationDAO.getOperation(dtoOperation.getId());
}
// if (dtoOperation.getType()
// .equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.COMMAND)) {
// io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation commandOperation;
// commandOperation =
// (io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.CommandOperation) commandOperationDAO.
// getOperation(dtoOperation.getId());
// dtoOperation.setEnabled(commandOperation.isEnabled());
// } else if (dtoOperation.getType().
// equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.CONFIG)) {
// dtoOperation = configOperationDAO.getOperation(dtoOperation.getId());
// } else if (dtoOperation.getType().equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.
// PROFILE)) {
// dtoOperation = profileOperationDAO.getOperation(dtoOperation.getId());
// } else if (dtoOperation.getType().equals(io.entgra.device.mgt.core.device.mgt.core.dto.operation.mgt.Operation.Type.
// POLICY)) {
// dtoOperation = policyOperationDAO.getOperation(dtoOperation.getId());
// }
operation = OperationDAOUtil.convertOperation(dtoOperation);
} catch (OperationManagementDAOException e) {
throw new OperationManagementException("Error occurred while retrieving the operation with operation Id '" +

@ -62,7 +62,7 @@ public interface OperationDAO {
void updateEnrollmentOperationsStatus(int enrolmentId, String operationCode, Operation.Status existingStatus,
Operation.Status newStatus) throws OperationManagementDAOException;
Map<Integer, Integer> getExistingOperationIDs(Integer[] enrolmentIds, String operationCode)
Map<Integer, Integer> getExistingNotExecutedOperationIDs(Integer[] enrolmentIds, String operationCode)
throws OperationManagementDAOException;
OperationResponseMeta addOperationResponse(int enrolmentId, io.entgra.device.mgt.core.device.mgt.common.operation.mgt.Operation operation, String deviceId)

@ -59,6 +59,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -75,15 +76,16 @@ public class GenericOperationDAOImpl implements OperationDAO {
try {
Connection connection = OperationManagementDAOFactory.getConnection();
String sql = "INSERT INTO DM_OPERATION(TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP, OPERATION_CODE, " +
"INITIATED_BY, OPERATION_DETAILS, TENANT_ID) VALUES (?, ?, ?, ?, ?, ?, ?)";
"INITIATED_BY, OPERATION_DETAILS, OPERATION_PROPERTIES, TENANT_ID) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
stmt = connection.prepareStatement(sql, new String[]{"id"});
stmt.setString(1, operation.getType().toString());
stmt.setLong(2, DeviceManagementDAOUtil.getCurrentUTCTime());
stmt.setLong(3, 0);
stmt.setString(4, operation.getCode());
stmt.setString(5, operation.getInitiatedBy());
stmt.setObject(6, operation);
stmt.setInt(7, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
stmt.setObject(6, operation.getPayLoad());
stmt.setObject(7, operation.getProperties());
stmt.setInt(8, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
stmt.executeUpdate();
rs = stmt.getGeneratedKeys();
@ -164,7 +166,7 @@ public class GenericOperationDAOImpl implements OperationDAO {
}
@Override
public Map<Integer, Integer> getExistingOperationIDs(Integer[] enrolmentIds, String operationCode)
public Map<Integer, Integer> getExistingNotExecutedOperationIDs(Integer[] enrolmentIds, String operationCode)
throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
@ -576,7 +578,7 @@ public class GenericOperationDAOImpl implements OperationDAO {
+ "dor.ID AS OP_RES_ID, de.DEVICE_ID, d.DEVICE_IDENTIFICATION, d.DEVICE_TYPE_ID, "
+ "dt.NAME AS DEVICE_TYPE_NAME, eom.STATUS, eom.CREATED_TIMESTAMP, "
+ "eom.UPDATED_TIMESTAMP, op.OPERATION_CODE, op.TYPE AS OPERATION_TYPE, "
+ "dor.OPERATION_RESPONSE, op.INITIATED_BY, dor.RECEIVED_TIMESTAMP, dor.IS_LARGE_RESPONSE FROM "
+ "dor.OPERATION_RESPONSE, op.INITIATED_BY, dor.RECEIVED_TIMESTAMP, dor.IS_LARGE_RESPONSE FROM "
+ "DM_ENROLMENT_OP_MAPPING eom INNER JOIN DM_OPERATION op "
+ "ON op.ID=eom.OPERATION_ID INNER JOIN DM_ENROLMENT de "
+ "ON de.ID=eom.ENROLMENT_ID INNER JOIN DM_DEVICE d ON d.ID=de.DEVICE_ID "
@ -1189,24 +1191,14 @@ public class GenericOperationDAOImpl implements OperationDAO {
Operation operation = null;
try {
Connection conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT ID, TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP, OPERATION_CODE, INITIATED_BY FROM " +
"DM_OPERATION WHERE id = ?";
String sql = "SELECT ID, TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP, OPERATION_CODE, INITIATED_BY, " +
"OPERATION_DETAILS, OPERATION_PROPERTIES FROM DM_OPERATION WHERE id = ?";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, id);
rs = stmt.executeQuery();
if (rs.next()) {
operation = new Operation();
operation.setId(rs.getInt("ID"));
operation.setType(Operation.Type.valueOf(rs.getString("TYPE")));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
if (rs.getLong("RECEIVED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(new Timestamp(rs.getLong("RECEIVED_TIMESTAMP") * 1000L).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
operation = OperationDAOUtil.getOperation(rs);
}
} catch (SQLException e) {
@ -1219,16 +1211,19 @@ public class GenericOperationDAOImpl implements OperationDAO {
}
@Override
public Operation getOperationByDeviceAndId(int enrolmentId, int operationId) throws OperationManagementDAOException {
public Operation getOperationByDeviceAndId(int enrolmentId, int operationId)
throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
Operation operation = null;
try {
Connection conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT o.ID, o.TYPE, o.CREATED_TIMESTAMP, o.RECEIVED_TIMESTAMP, om.STATUS, " +
"o.OPERATION_CODE, o.INITIATED_BY, om.ID AS OM_MAPPING_ID, " +
"om.UPDATED_TIMESTAMP FROM (SELECT ID, TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP," +
"OPERATION_CODE, INITIATED_BY FROM DM_OPERATION WHERE id = ?) o INNER JOIN (SELECT * FROM " +
"o.OPERATION_CODE, o.INITIATED_BY, o.OPERATION_DETAILS, o.OPERATION_PROPERTIES, " +
"om.ID AS OM_MAPPING_ID, om.UPDATED_TIMESTAMP, om.STATUS FROM " +
"(SELECT ID, TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP," +
"OPERATION_CODE, INITIATED_BY, OPERATION_DETAILS, OPERATION_PROPERTIES FROM DM_OPERATION " +
"WHERE id = ?) o INNER JOIN (SELECT * FROM " +
"DM_ENROLMENT_OP_MAPPING dm where dm.OPERATION_ID = ? AND dm.ENROLMENT_ID = ?) om " +
"ON o.ID = om.OPERATION_ID ";
stmt = conn.prepareStatement(sql);
@ -1238,20 +1233,14 @@ public class GenericOperationDAOImpl implements OperationDAO {
rs = stmt.executeQuery();
if (rs.next()) {
operation = new Operation();
operation.setId(rs.getInt("ID"));
operation.setType(Operation.Type.valueOf(rs.getString("TYPE")));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
operation.setStatus(Operation.Status.valueOf(rs.getString("STATUS")));
operation = OperationDAOUtil.getOperation(rs);
if (rs.getLong("UPDATED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(
new Timestamp((rs.getLong("UPDATED_TIMESTAMP") * 1000)).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
OperationDAOUtil.setActivityId(operation, rs.getInt("ID"));
operation.setStatus(Operation.Status.valueOf(rs.getString("STATUS")));
}
} catch (SQLException e) {
throw new OperationManagementDAOException("SQL error occurred while retrieving the operation " +
@ -1272,7 +1261,8 @@ public class GenericOperationDAOImpl implements OperationDAO {
try {
Connection conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT o.ID, TYPE, o.CREATED_TIMESTAMP, o.RECEIVED_TIMESTAMP, o.OPERATION_CODE, " +
"o.INITIATED_BY, om.ID AS OM_MAPPING_ID, om.UPDATED_TIMESTAMP FROM DM_OPERATION o " +
"o.INITIATED_BY, o.OPERATION_DETAILS, o.OPERATION_PROPERTIES, om.ID AS OM_MAPPING_ID, " +
"om.UPDATED_TIMESTAMP FROM DM_OPERATION o " +
"INNER JOIN (SELECT * FROM DM_ENROLMENT_OP_MAPPING dm " +
"WHERE dm.ENROLMENT_ID = ? AND dm.STATUS = ?) om ON o.ID = om.OPERATION_ID ORDER BY o.CREATED_TIMESTAMP DESC";
stmt = conn.prepareStatement(sql);
@ -1281,20 +1271,14 @@ public class GenericOperationDAOImpl implements OperationDAO {
rs = stmt.executeQuery();
while (rs.next()) {
operation = new Operation();
operation.setId(rs.getInt("ID"));
operation.setType(Operation.Type.valueOf(rs.getString("TYPE")));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
operation = OperationDAOUtil.getOperation(rs);
operation.setStatus(status);
if (rs.getLong("UPDATED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(
new Timestamp((rs.getLong("UPDATED_TIMESTAMP") * 1000)).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
operation.setStatus(status);
OperationDAOUtil.setActivityId(operation, rs.getInt("ID"));
operations.add(operation);
}
} catch (SQLException e) {
@ -1317,7 +1301,8 @@ public class GenericOperationDAOImpl implements OperationDAO {
try {
Connection conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT o.ID, TYPE, o.CREATED_TIMESTAMP, o.RECEIVED_TIMESTAMP, o.OPERATION_CODE, " +
"o.INITIATED_BY, om.ID AS OM_MAPPING_ID, om.UPDATED_TIMESTAMP FROM DM_OPERATION o " +
"o.INITIATED_BY, o.OPERATION_DETAILS, o.OPERATION_PROPERTIES, om.ID AS OM_MAPPING_ID, " +
"om.UPDATED_TIMESTAMP FROM DM_OPERATION o " +
"INNER JOIN (SELECT * FROM DM_ENROLMENT_OP_MAPPING dm " +
"WHERE dm.ENROLMENT_ID = ? AND dm.STATUS = ?) om ON o.ID = om.OPERATION_ID ORDER BY " +
"o.CREATED_TIMESTAMP DESC LIMIT ?,?";
@ -1329,20 +1314,15 @@ public class GenericOperationDAOImpl implements OperationDAO {
rs = stmt.executeQuery();
while (rs.next()) {
operation = new Operation();
operation.setId(rs.getInt("ID"));
operation.setType(Operation.Type.valueOf(rs.getString("TYPE")));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
operation = OperationDAOUtil.getOperation(rs);
operation.setStatus(status);
if (rs.getLong("UPDATED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(
new Timestamp((rs.getLong("UPDATED_TIMESTAMP") * 1000)).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
operation.setStatus(status);
OperationDAOUtil.setActivityId(operation, rs.getInt("OM_MAPPING_ID"));
operations.add(operation);
}
} catch (SQLException e) {
@ -1364,7 +1344,8 @@ public class GenericOperationDAOImpl implements OperationDAO {
try {
Connection conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT o.ID, o.TYPE, o.CREATED_TIMESTAMP, o.RECEIVED_TIMESTAMP, " +
"o.OPERATION_CODE, o.INITIATED_BY, om.STATUS, om.ID AS OM_MAPPING_ID, om.UPDATED_TIMESTAMP " +
"o.OPERATION_CODE, o.INITIATED_BY, o.OPERATION_DETAILS, o.OPERATION_PROPERTIES, om.STATUS, " +
"om.ID AS OM_MAPPING_ID, om.UPDATED_TIMESTAMP " +
"FROM DM_OPERATION o INNER JOIN (SELECT * FROM DM_ENROLMENT_OP_MAPPING dm " +
"WHERE dm.ENROLMENT_ID = ?) om ON o.ID = om.OPERATION_ID " +
"ORDER BY o.CREATED_TIMESTAMP DESC, o.ID DESC";
@ -1373,18 +1354,13 @@ public class GenericOperationDAOImpl implements OperationDAO {
rs = stmt.executeQuery();
while (rs.next()) {
operation = new Operation();
operation.setId(rs.getInt("ID"));
operation.setType(Operation.Type.valueOf(rs.getString("TYPE")));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
operation = OperationDAOUtil.getOperation(rs);
if (rs.getLong("UPDATED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(
new Timestamp((rs.getLong("UPDATED_TIMESTAMP") * 1000)).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
operation.setStatus(Operation.Status.valueOf(rs.getString("STATUS")));
operations.add(operation);
}
@ -1428,6 +1404,7 @@ public class GenericOperationDAOImpl implements OperationDAO {
"o.OPERATION_CODE, " +
"o.INITIATED_BY, " +
"o.OPERATION_DETAILS, " +
"o.OPERATION_PROPERTIES, " +
"om.STATUS, " +
"om.ID AS OM_MAPPING_ID, " +
"om.UPDATED_TIMESTAMP " +
@ -1514,36 +1491,14 @@ public class GenericOperationDAOImpl implements OperationDAO {
stmt.setInt(paramIndex, request.getRowCount());
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
operation = new Operation();
operation.setId(rs.getInt("ID"));
operation.setType(Operation.Type.valueOf(rs.getString("TYPE")));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
operation = OperationDAOUtil.getOperation(rs);
if (rs.getLong("UPDATED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(
new Timestamp((rs.getLong("UPDATED_TIMESTAMP") * 1000)).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
if (MDMAppConstants.AndroidConstants.UNMANAGED_APP_UNINSTALL.equals(operation.getCode())) {
byte[] operationDetails = rs.getBytes("OPERATION_DETAILS");
try (ByteArrayInputStream bais = new ByteArrayInputStream(operationDetails);
ObjectInputStream ois = new ObjectInputStream(bais)) {
profileOperation = (ProfileOperation) ois.readObject();
operation.setPayLoad(profileOperation.getPayLoad());
} catch (IOException e) {
String msg = "IO Error occurred while retrieving app data of operation ";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
} catch (ClassNotFoundException e) {
String msg = "Class not found error occurred while retrieving app data of operation ";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
}
}
operation.setStatus(Operation.Status.valueOf(rs.getString("STATUS")));
OperationDAOUtil.setActivityId(operation, rs.getInt("ID"));
operations.add(operation);
}
}
@ -1562,7 +1517,6 @@ public class GenericOperationDAOImpl implements OperationDAO {
List<Operation> operations = new ArrayList<>();
String createdTo = null;
String createdFrom = null;
ProfileOperation profileOperation = null;
DateFormat simple = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
boolean isCreatedDayProvided = false;
boolean isUpdatedDayProvided = false; //updated day = received day
@ -1586,6 +1540,7 @@ public class GenericOperationDAOImpl implements OperationDAO {
"o.OPERATION_CODE, " +
"o.INITIATED_BY, " +
"o.OPERATION_DETAILS, " +
"o.OPERATION_PROPERTIES, " +
"om.STATUS, " +
"om.ID AS OM_MAPPING_ID, " +
"om.UPDATED_TIMESTAMP " +
@ -1672,36 +1627,14 @@ public class GenericOperationDAOImpl implements OperationDAO {
stmt.setInt(paramIndex, request.getRowCount());
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
operation = new Operation();
operation.setId(rs.getInt("ID"));
operation.setType(Operation.Type.valueOf(rs.getString("TYPE")));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
operation = OperationDAOUtil.getOperation(rs);
if (rs.getLong("UPDATED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(
new Timestamp((rs.getLong("UPDATED_TIMESTAMP") * 1000)).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
if (MDMAppConstants.AndroidConstants.UNMANAGED_APP_UNINSTALL.equals(operation.getCode())) {
byte[] operationDetails = rs.getBytes("OPERATION_DETAILS");
try (ByteArrayInputStream bais = new ByteArrayInputStream(operationDetails);
ObjectInputStream ois = new ObjectInputStream(bais)) {
profileOperation = (ProfileOperation) ois.readObject();
operation.setPayLoad(profileOperation.getPayLoad());
} catch (IOException e) {
String msg = "IO Error occurred while retrieving app data of operation ";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
} catch (ClassNotFoundException e) {
String msg = "Class not found error occurred while retrieving app data of operation ";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
}
}
operation.setStatus(Operation.Status.valueOf(rs.getString("STATUS")));
OperationDAOUtil.setActivityId(operation, rs.getInt("ID"));
operations.add(operation);
}
}
@ -1932,7 +1865,8 @@ public class GenericOperationDAOImpl implements OperationDAO {
try {
Connection connection = OperationManagementDAOFactory.getConnection();
stmt = connection.prepareStatement("SELECT o.ID, o.TYPE, o.CREATED_TIMESTAMP, o.RECEIVED_TIMESTAMP, " +
"o.OPERATION_CODE, o.INITIATED_BY, om.ID AS OM_MAPPING_ID, om.UPDATED_TIMESTAMP FROM DM_OPERATION o " +
"o.OPERATION_CODE, o.INITIATED_BY, o.OPERATION_DETAILS, o.OPERATION_PROPERTIES, " +
"om.ID AS OM_MAPPING_ID, om.UPDATED_TIMESTAMP FROM DM_OPERATION o " +
"INNER JOIN (SELECT * FROM DM_ENROLMENT_OP_MAPPING dm " +
"WHERE dm.ENROLMENT_ID = ? AND dm.STATUS = ?) om ON o.ID = om.OPERATION_ID " +
"ORDER BY om.UPDATED_TIMESTAMP ASC, om.ID ASC LIMIT 1");
@ -1942,21 +1876,14 @@ public class GenericOperationDAOImpl implements OperationDAO {
Operation operation = null;
if (rs.next()) {
operation = new Operation();
operation.setType(OperationDAOUtil.getType(rs.getString("TYPE")));
operation.setId(rs.getInt("ID"));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
operation = OperationDAOUtil.getOperation(rs);
if (rs.getLong("UPDATED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(
new Timestamp((rs.getLong("UPDATED_TIMESTAMP") * 1000)).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
operation.setStatus(Operation.Status.PENDING);
OperationDAOUtil.setActivityId(operation, rs.getInt("ID"));
}
return operation;
} catch (SQLException e) {
@ -1975,8 +1902,9 @@ public class GenericOperationDAOImpl implements OperationDAO {
List<Operation> operations = new ArrayList<>();
try {
Connection conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT o.ID, TYPE, o.CREATED_TIMESTAMP, o.RECEIVED_TIMESTAMP, OPERATION_CODE, o.INITIATED_BY," +
" om.ID AS OM_MAPPING_ID, om.UPDATED_TIMESTAMP FROM " +
String sql = "SELECT o.ID, TYPE, o.CREATED_TIMESTAMP, o.RECEIVED_TIMESTAMP, OPERATION_CODE, " +
"o.INITIATED_BY, o.OPERATION_DETAILS, o.OPERATION_PROPERTIES, " +
"om.ID AS OM_MAPPING_ID, om.UPDATED_TIMESTAMP FROM " +
"(SELECT o.ID, TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP, OPERATION_CODE, INITIATED_BY " +
"FROM DM_OPERATION o WHERE o.TYPE = ?) o INNER JOIN (SELECT * FROM DM_ENROLMENT_OP_MAPPING dm " +
"WHERE dm.ENROLMENT_ID = ? AND dm.STATUS = ?) om ON o.ID = om.OPERATION_ID ORDER BY o.CREATED_TIMESTAMP ASC";
@ -1988,19 +1916,14 @@ public class GenericOperationDAOImpl implements OperationDAO {
rs = stmt.executeQuery();
while (rs.next()) {
operation = new Operation();
operation.setId(rs.getInt("ID"));
operation.setType(Operation.Type.valueOf(rs.getString("TYPE")));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
operation = OperationDAOUtil.getOperation(rs);
operation.setStatus(status);
if (rs.getLong("UPDATED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(
new Timestamp((rs.getLong("UPDATED_TIMESTAMP") * 1000)).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
OperationDAOUtil.setActivityId(operation, rs.getInt("ID"));
operations.add(operation);
}
} catch (SQLException e) {
@ -2013,7 +1936,8 @@ public class GenericOperationDAOImpl implements OperationDAO {
}
@Override
public Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus,
public Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus,
Operation.PushNotificationStatus pushNotificationStatus,
int limit) throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
@ -2059,7 +1983,8 @@ public class GenericOperationDAOImpl implements OperationDAO {
@Override
public Map<Integer, List<OperationMapping>> getAllocatedOperationMappingsByStatus(Operation.Status opStatus,
Operation.PushNotificationStatus pushNotificationStatus, int limit, int activeServerCount, int serverIndex)
Operation.PushNotificationStatus pushNotificationStatus,
int limit, int activeServerCount, int serverIndex)
throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;

@ -44,6 +44,7 @@ import java.util.Date;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
public class OperationDAOUtil {
@ -330,4 +331,60 @@ public class OperationDAOUtil {
deviceActivity.setUpdatedTimestamp(new Date(rs.getLong(("UPDATED_TIMESTAMP")) * 1000).toString());
return deviceActivity;
}
public static Operation getOperation(ResultSet rs) throws OperationManagementDAOException, SQLException {
Operation operation = new Operation();
operation.setId(rs.getInt("ID"));
operation.setType(Operation.Type.valueOf(rs.getString("TYPE")));
operation.setCreatedTimeStamp(new Timestamp(rs.getLong("CREATED_TIMESTAMP") * 1000L).toString());
if (rs.getLong("RECEIVED_TIMESTAMP") == 0) {
operation.setReceivedTimeStamp("");
} else {
operation.setReceivedTimeStamp(
new Timestamp((rs.getLong("RECEIVED_TIMESTAMP") * 1000)).toString());
}
operation.setCode(rs.getString("OPERATION_CODE"));
operation.setInitiatedBy(rs.getString("INITIATED_BY"));
byte[] operationDetails = rs.getBytes("OPERATION_DETAILS");
if (!rs.wasNull()) {
try (ByteArrayInputStream bais = new ByteArrayInputStream(operationDetails);
ObjectInputStream ois = new ObjectInputStream(bais)) {
operation.setPayLoad(ois.readObject());
} catch (IOException e) {
String msg = "IO Error occurred while retrieving operation details";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
} catch (ClassNotFoundException e) {
String msg = "Class not found error occurred while retrieving operation details";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
} catch (Exception e) {
String msg = "Error occurred while retrieving operation details";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
}
}
byte[] operationProperties = rs.getBytes("OPERATION_PROPERTIES");
if (!rs.wasNull()) {
try (ByteArrayInputStream bais = new ByteArrayInputStream(operationProperties);
ObjectInputStream ois = new ObjectInputStream(bais)) {
operation.setProperties((Properties) ois.readObject());
} catch (IOException e) {
String msg = "IO Error occurred while retrieving operation properties";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
} catch (ClassNotFoundException e) {
String msg = "Class not found error occurred while retrieving operation properties";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
} catch (Exception e) {
String msg = "Error occurred while retrieving operation properties";
log.error(msg, e);
throw new OperationManagementDAOException(msg, e);
}
}
OperationDAOUtil.setActivityId(operation, rs.getInt("ID"));
return operation;
}
}

@ -86,13 +86,13 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
if (localHashIndex.equals(String.valueOf(taskContext.getServerHashIndex()))) {
if (log.isDebugEnabled()) {
log.debug("Executing dynamically scheduled task (" + getTaskName() +
") for current server hash index: " + localHashIndex);
") for current server hash index: " + taskContext.getServerHashIndex());
}
executeDynamicTask();
} else {
if (log.isDebugEnabled()) {
log.debug("Ignoring execution of task (" + getTaskName() +
") not belonging to current serer hash index: " + localHashIndex);
") not belonging to current server hash index: " + taskContext.getServerHashIndex());
}
}
} else {

@ -140,6 +140,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION (
OPERATION_CODE VARCHAR(1000) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TENANT_ID INTEGER NOT NULL,
PRIMARY KEY (ID)

@ -70,46 +70,12 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION (
OPERATION_CODE VARCHAR(1000) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TENANT_ID INTEGER NOT NULL,
PRIMARY KEY (ID)
);
CREATE TABLE IF NOT EXISTS DM_CONFIG_OPERATION (
OPERATION_ID INTEGER NOT NULL,
OPERATION_CONFIG BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
PRIMARY KEY (OPERATION_ID),
CONSTRAINT fk_dm_operation_config FOREIGN KEY (OPERATION_ID) REFERENCES
DM_OPERATION (ID) ON DELETE NO ACTION ON UPDATE NO ACTION
);
CREATE TABLE IF NOT EXISTS DM_COMMAND_OPERATION (
OPERATION_ID INTEGER NOT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
PRIMARY KEY (OPERATION_ID),
CONSTRAINT fk_dm_operation_command FOREIGN KEY (OPERATION_ID) REFERENCES
DM_OPERATION (ID) ON DELETE NO ACTION ON UPDATE NO ACTION
);
CREATE TABLE IF NOT EXISTS DM_POLICY_OPERATION (
OPERATION_ID INTEGER NOT NULL,
ENABLED INTEGER NOT NULL DEFAULT 0,
OPERATION_DETAILS BLOB DEFAULT NULL,
PRIMARY KEY (OPERATION_ID),
CONSTRAINT fk_dm_operation_policy FOREIGN KEY (OPERATION_ID) REFERENCES
DM_OPERATION (ID) ON DELETE NO ACTION ON UPDATE NO ACTION
);
CREATE TABLE IF NOT EXISTS DM_PROFILE_OPERATION (
OPERATION_ID INTEGER NOT NULL,
ENABLED INTEGER NOT NULL DEFAULT 0,
OPERATION_DETAILS BLOB DEFAULT NULL,
PRIMARY KEY (OPERATION_ID),
CONSTRAINT fk_dm_operation_profile FOREIGN KEY (OPERATION_ID) REFERENCES
DM_OPERATION (ID) ON DELETE NO ACTION ON UPDATE NO ACTION
);
CREATE TABLE IF NOT EXISTS DM_ENROLMENT (
ID INTEGER AUTO_INCREMENT NOT NULL,
DEVICE_ID INTEGER NOT NULL,

@ -89,6 +89,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION (
OPERATION_CODE VARCHAR(1000) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TENANT_ID INTEGER NOT NULL,
PRIMARY KEY (ID)

@ -89,6 +89,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION (
OPERATION_CODE VARCHAR(1000) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TENANT_ID INTEGER NOT NULL,
PRIMARY KEY (ID)

@ -45,8 +45,11 @@ import java.util.Timer;
import java.util.TimerTask;
public class IoTSStartupHandler implements ServerStartupObserver {
private static final Log log = LogFactory.getLog(IoTSStartupHandler.class);
private static int lastHashIndex = -1;
@Override
public void completingServerStartup() {
}
@ -80,6 +83,22 @@ public class IoTSStartupHandler implements ServerStartupObserver {
Map<Integer, List<DynamicTask>> tenantedDynamicTasks = TaskWatcherDataHolder.getInstance()
.getTaskManagementService().getDynamicTasksForAllTenants();
int serverHashIdx;
try {
serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx();
} catch (HeartBeatManagementException e) {
String msg = "Failed to get server hash index.";
log.error(msg, e);
throw new TaskManagementException(msg, e);
}
if (serverHashIdx != lastHashIndex) {
log.info("Server hash index changed. Old: " + lastHashIndex + ", new: " + serverHashIdx);
deleteAllDynamicNTasks(nTaskService, tenantedDynamicTasks, serverHashIdx);
lastHashIndex = serverHashIdx;
}
scheduleMissingTasks(nTaskService, tenantedDynamicTasks);
deleteObsoleteTasks(nTaskService, tenantedDynamicTasks);
@ -96,7 +115,45 @@ public class IoTSStartupHandler implements ServerStartupObserver {
}
private static void scheduleMissingTasks(TaskService nTaskService, Map<Integer,
private void deleteAllDynamicNTasks(TaskService nTaskService, Map<Integer,
List<DynamicTask>> tenantedDynamicTasks, int serverHashIdx) throws TaskException {
List<Tenant> tenants = getAllTenants();
TaskManager taskManager;
for (Tenant tenant : tenants) {
if (tenantedDynamicTasks.get(tenant.getId()) == null) {
if (log.isTraceEnabled()) {
log.trace("Dynamic tasks not running for tenant: [" + tenant.getId() + "] "
+ tenant.getDomain());
}
continue;
}
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true);
if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
}
taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
List<TaskInfo> tasks = taskManager.getAllTasks();
// Remove all applicable dynamic tasks from the nTask core
for (TaskInfo taskInfo : tasks) {
for (DynamicTask dt : tenantedDynamicTasks.get(tenant.getId())) {
if (tenant.getId() == dt.getTenantId()
&& taskInfo.getName()
.equals(TaskManagementUtil.generateNTaskName(dt.getDynamicTaskId(), serverHashIdx))) {
taskManager.deleteTask(taskInfo.getName());
if (log.isDebugEnabled()) {
log.debug("Task '" + taskInfo.getName() + "' deleted as server hash changed.");
}
}
}
}
PrivilegedCarbonContext.endTenantFlow();
}
}
private void scheduleMissingTasks(TaskService nTaskService, Map<Integer,
List<DynamicTask>> tenantedDynamicTasks)
throws TaskException, TaskManagementException {
@ -187,26 +244,10 @@ public class IoTSStartupHandler implements ServerStartupObserver {
}
}
private static void deleteObsoleteTasks(TaskService nTaskService,
Map<Integer, List<DynamicTask>> tenantedDynamicTasks)
private void deleteObsoleteTasks(TaskService nTaskService,
Map<Integer, List<DynamicTask>> tenantedDynamicTasks)
throws TaskManagementException, TaskException {
List<Tenant> tenants = new ArrayList<>();
try {
RealmService realmService = TaskWatcherDataHolder.getInstance().getRealmService();
Tenant[] tenantArray = realmService.getTenantManager().getAllTenants();
if (tenantArray != null && tenantArray.length != 0) {
tenants.addAll(Arrays.asList(tenantArray));
}
Tenant superTenant = new Tenant();
superTenant.setId(-1234);
tenants.add(superTenant);
} catch (UserStoreException e) {
String msg = "Unable to load tenants";
log.error(msg, e);
return;
}
TaskManager taskManager;
Set<Integer> hashIds;
try {
@ -217,6 +258,8 @@ public class IoTSStartupHandler implements ServerStartupObserver {
throw new TaskManagementException(msg, e);
}
List<Tenant> tenants = getAllTenants();
for (Tenant tenant : tenants) {
if (tenantedDynamicTasks.get(tenant.getId()) == null) {
if (log.isTraceEnabled()) {
@ -258,4 +301,23 @@ public class IoTSStartupHandler implements ServerStartupObserver {
}
}
private List<Tenant> getAllTenants() {
List<Tenant> tenants = new ArrayList<>();
try {
RealmService realmService = TaskWatcherDataHolder.getInstance().getRealmService();
Tenant[] tenantArray = realmService.getTenantManager().getAllTenants();
if (tenantArray != null && tenantArray.length != 0) {
tenants.addAll(Arrays.asList(tenantArray));
}
Tenant superTenant = new Tenant();
superTenant.setId(-1234);
tenants.add(superTenant);
return tenants;
} catch (UserStoreException e) {
String msg = "Unable to load tenants";
log.error(msg, e);
return new ArrayList<>();
}
}
}

@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION_ARCH (
OPERATION_CODE VARCHAR(50) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
ARCHIVED_AT TIMESTAMP DEFAULT NOW()
)ENGINE = InnoDB;

@ -6,6 +6,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION_ARCH (
OPERATION_CODE VARCHAR(50) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BYTEA DEFAULT NULL,
OPERATION_PROPERTIES BYTEA DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
ARCHIVED_AT TIMESTAMP(0) DEFAULT NOW()
);

@ -91,6 +91,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION (
OPERATION_CODE VARCHAR(1000) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TENANT_ID INT NOT NULL,
PRIMARY KEY (ID)

@ -125,6 +125,7 @@ CREATE TABLE DM_OPERATION (
OPERATION_CODE VARCHAR(50) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS VARBINARY(MAX) DEFAULT NULL,
OPERATION_PROPERTIES VARBINARY(MAX) DEFAULT NULL,
ENABLED BIT NOT NULL DEFAULT 0,
TENANT_ID INTEGER NOT NULL,
PRIMARY KEY (ID)

@ -106,6 +106,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION (
OPERATION_CODE VARCHAR(50) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TENANT_ID INT NOT NULL,
PRIMARY KEY (ID)

@ -181,6 +181,7 @@ CREATE TABLE DM_OPERATION (
INITIATED_BY VARCHAR2(100) NULL,
ENABLED NUMBER(10) DEFAULT 0 NOT NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
TENANT_ID INTEGER NOT NULL,
CONSTRAINT PK_DM_OPERATION PRIMARY KEY (ID)
)

@ -99,6 +99,7 @@ CREATE TABLE IF NOT EXISTS DM_OPERATION (
OPERATION_CODE VARCHAR(50) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BYTEA DEFAULT NULL,
OPERATION_PROPERTIES BYTEA DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TENANT_ID INTEGER NOT NULL,
PRIMARY KEY (ID)

@ -157,6 +157,7 @@ CREATE TABLE NEW_DM_OPERATION (
OPERATION_CODE VARCHAR(50) NOT NULL,
INITIATED_BY VARCHAR(100) NULL,
OPERATION_DETAILS BLOB DEFAULT NULL,
OPERATION_PROPERTIES BLOB DEFAULT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
PRIMARY KEY (ID)
);
@ -218,8 +219,8 @@ SELECT CONCAT(NOW(), ': Inserted ', ROW_COUNT(),' records to DM_ENROLMENT_OP_MAP
Insert data into NEW_DM_OPERATION from DM_OPERATION only with the OPERATION_IDs from DM_ENROLMENT_OP_MAPPING
*/
SELECT CONCAT(NOW(), ': Inserting data into DM_OPERATION FROM OLD_DM_OPERATION.') AS '';
INSERT INTO DM_OPERATION (ID, TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP, OPERATION_CODE, INITIATED_BY, OPERATION_DETAILS, ENABLED)
SELECT ID, TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP, OPERATION_CODE, INITIATED_BY, OPERATION_DETAILS, ENABLED
INSERT INTO DM_OPERATION (ID, TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP, OPERATION_CODE, INITIATED_BY, OPERATION_DETAILS, OPERATION_PROPERTIES, ENABLED)
SELECT ID, TYPE, CREATED_TIMESTAMP, RECEIVED_TIMESTAMP, OPERATION_CODE, INITIATED_BY, OPERATION_DETAILS, OPERATION_PROPERTIES, ENABLED
FROM OLD_DM_OPERATION
WHERE ID IN(SELECT OPERATION_ID FROM DM_ENROLMENT_OP_MAPPING);
SELECT CONCAT(NOW(), ': Inserted ', ROW_COUNT(),' records to DM_OPERATION.') AS '';

Loading…
Cancel
Save