From fa9a52048e89b6bd5df9cf99659c83c8fd71a43b Mon Sep 17 00:00:00 2001 From: abhishekdesilva Date: Wed, 8 Aug 2018 11:10:56 +0530 Subject: [PATCH] Add archive pending operations configuration --- .../core/archival/ArchivalServiceImpl.java | 46 +++++++++++++------ .../archival/dao/impl/ArchivalDAOImpl.java | 26 ++++++++--- .../archival/ArchivalTaskConfiguration.java | 14 ++++-- 3 files changed, 64 insertions(+), 22 deletions(-) diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/ArchivalServiceImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/ArchivalServiceImpl.java index 5fa0518da8..a4b8405d09 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/ArchivalServiceImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/ArchivalServiceImpl.java @@ -38,10 +38,14 @@ public class ArchivalServiceImpl implements ArchivalService { private ArchivalDAO archivalDAO; private DataDeletionDAO dataDeletionDAO; - private static int ITERATION_COUNT = + private static final int EXECUTION_BATCH_SIZE = DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration() .getArchivalTaskConfiguration().getBatchSize(); + private static final boolean ARCHIVE_PENDING_OPERATIONS = + DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration() + .getArchivalTaskConfiguration().isArchivePendingOperations(); + private String[] NOT_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"}; private String[] NOT_PENDING_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED", "IN_PROGRESS"}; private String[] NOT_PENDING_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"}; @@ -54,7 +58,7 @@ public class ArchivalServiceImpl implements ArchivalService { @Override public void archiveTransactionalRecords() throws ArchivalException { List allOperations; - List pendingAndIPOperations; + try { ArchivalSourceDAOFactory.openConnection(); ArchivalDestinationDAOFactory.openConnection(); @@ -67,7 +71,7 @@ public class ArchivalServiceImpl implements ArchivalService { if (log.isDebugEnabled()) { log.debug("Fetching All Pending Operations"); } - pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations(); + } catch (ArchivalDAOException e) { // rollbackTransactions(); @@ -83,19 +87,35 @@ public class ArchivalServiceImpl implements ArchivalService { ArchivalDestinationDAOFactory.closeConnection(); } - log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() + - " P&IP Operations"); - //Get the diff of operations - Set setA = new HashSet<>(allOperations); - Set setB = new HashSet<>(pendingAndIPOperations); - setA.removeAll(setB); + List candidates = allOperations; + log.info(allOperations.size() + " All Operations."); - List candidates = new ArrayList<>(); - candidates.addAll(setA); + if (!ARCHIVE_PENDING_OPERATIONS) { + try { + ArchivalSourceDAOFactory.openConnection(); + ArchivalDestinationDAOFactory.openConnection(); + List pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations(); + log.info(pendingAndIPOperations.size() +" P&IP Operations"); +// Get the diff of operations + candidates.removeAll(pendingAndIPOperations); + } catch (ArchivalDAOException e) { + String msg = "Error occurred while retrieving the pending operations"; + log.error(msg, e); + throw new ArchivalException(msg, e); + } catch (SQLException e) { + String msg = "An error occurred while connecting to the archival database"; + log.error(msg, e); + throw new ArchivalException(msg, e); + } finally { + ArchivalSourceDAOFactory.closeConnection(); + ArchivalDestinationDAOFactory.closeConnection(); + } + } int total = candidates.size(); int batches = calculateNumberOfBatches(total); - int batchSize = ITERATION_COUNT; + log.info(total + " Operations ready for archiving. " + batches + " iterations to be done."); + int batchSize = EXECUTION_BATCH_SIZE; if (log.isDebugEnabled()) { log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done."); log.debug(batchSize + " is the batch size"); @@ -278,7 +298,7 @@ public class ArchivalServiceImpl implements ArchivalService { private int calculateNumberOfBatches(int total) { int batches = 0; - int batchSize = ITERATION_COUNT; + int batchSize = EXECUTION_BATCH_SIZE; if ((total % batchSize) > 0) { batches = (total / batchSize) + 1; } else { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/ArchivalDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/ArchivalDAOImpl.java index 1a076ec880..8763f866e0 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/ArchivalDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/ArchivalDAOImpl.java @@ -55,15 +55,15 @@ public class ArchivalDAOImpl implements ArchivalDAO { ResultSet rs = null; try { Connection conn = ArchivalSourceDAOFactory.getConnection(); - String sql = "SELECT DISTINCT OPERATION_ID FROM DM_ENROLMENT_OP_MAPPING " + - "WHERE CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)"; + String sql = "SELECT ID FROM DM_OPERATION WHERE CREATED_TIMESTAMP < (DATE_SUB(NOW(), INTERVAL " + + this.retentionPeriod + " DAY))"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); if (log.isDebugEnabled()) { log.debug("Selected Operation Ids from Enrolment OP Mapping"); } while (rs.next()) { - operationIds.add(rs.getInt("OPERATION_ID")); + operationIds.add(rs.getInt("ID")); } } catch (SQLException e) { String msg = "An error occurred while getting a list operation Ids to archive"; @@ -86,9 +86,23 @@ public class ArchivalDAOImpl implements ArchivalDAO { ResultSet rs = null; try { Connection conn = ArchivalSourceDAOFactory.getConnection(); - String sql = "SELECT DISTINCT OPERATION_ID " + - " FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS='PENDING' OR STATUS='IN_PROGRESS' " + - " AND CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)"; + String sql = "(SELECT DISTINCT\n" + + " OPERATION_ID\n" + + " FROM\n" + + " DM_ENROLMENT_OP_MAPPING\n" + + " WHERE\n" + + " STATUS = 'PENDING'\n" + + " AND CREATED_TIMESTAMP < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL " + + this.retentionPeriod + " DAY))) \n" + + " UNION ALL \n" + + "\t(SELECT DISTINCT\n" + + " OPERATION_ID\n" + + " FROM\n" + + " DM_ENROLMENT_OP_MAPPING\n" + + " WHERE\n" + + " STATUS = 'IN_PROGRESS'\n" + + " AND CREATED_TIMESTAMP < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL " + + "" + this.retentionPeriod + " DAY)))"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); if (log.isDebugEnabled()) { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/config/archival/ArchivalTaskConfiguration.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/config/archival/ArchivalTaskConfiguration.java index 609172beb5..382dc1d486 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/config/archival/ArchivalTaskConfiguration.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/config/archival/ArchivalTaskConfiguration.java @@ -29,7 +29,7 @@ public class ArchivalTaskConfiguration { private int retentionPeriod; private int batchSize; private PurgingTaskConfiguration purgingTaskConfiguration; - private final int MULTIPLIER = -1; + private boolean archivePendingOperations; @XmlElement(name = "Enabled", required = true) public boolean isEnabled() { @@ -60,8 +60,7 @@ public class ArchivalTaskConfiguration { @XmlElement(name = "RetentionPeriod", required = true) public int getRetentionPeriod() { - // multiply by -1 to get the diff - return retentionPeriod * MULTIPLIER; + return retentionPeriod; } public void setRetentionPeriod(int retentionPeriod) { @@ -85,4 +84,13 @@ public class ArchivalTaskConfiguration { public void setBatchSize(int batchSize) { this.batchSize = batchSize; } + + @XmlElement(name ="ArchivePendingOperations") + public boolean isArchivePendingOperations() { + return archivePendingOperations; + } + + public void setArchivePendingOperations(boolean archivePendingOperations) { + this.archivePendingOperations = archivePendingOperations; + } }