From 4c067f0f28bbc4dce71867c5425e5bc15bb29f5c Mon Sep 17 00:00:00 2001 From: abhishekdesilva Date: Wed, 25 Jul 2018 21:15:49 +0530 Subject: [PATCH] Archival/Purging Task Improvement --- .../core/archival/ArchivalServiceImpl.java | 210 +++++-- .../beans/ArchiveCommandOperation.java | 44 ++ .../beans/ArchiveEnrolmentOperationMap.java | 80 +++ .../archival/beans/ArchiveNotification.java | 80 +++ .../core/archival/beans/ArchiveOperation.java | 72 +++ .../beans/ArchiveOperationResponse.java | 83 +++ .../beans/ArchiveProfileOperation.java | 53 ++ .../mgt/core/archival/dao/ArchivalDAO.java | 27 +- .../core/archival/dao/ArchivalDAOUtil.java | 10 + .../archival/dao/impl/ArchivalDAOImpl.java | 582 +++++++++++++----- .../dao/impl/DataDeletionDAOImpl.java | 21 +- .../task/impl/ArchivedDataDeletionTask.java | 2 +- 12 files changed, 1043 insertions(+), 221 deletions(-) create mode 100644 components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveCommandOperation.java create mode 100644 components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveEnrolmentOperationMap.java create mode 100644 components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveNotification.java create mode 100644 components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveOperation.java create mode 100644 components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveOperationResponse.java create mode 100644 components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveProfileOperation.java diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/ArchivalServiceImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/ArchivalServiceImpl.java index b8ebd30a0e..5fa0518da8 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/ArchivalServiceImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/ArchivalServiceImpl.java @@ -22,7 +22,9 @@ package org.wso2.carbon.device.mgt.core.archival; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.device.mgt.common.TransactionManagementException; +import org.wso2.carbon.device.mgt.core.archival.beans.*; import org.wso2.carbon.device.mgt.core.archival.dao.*; +import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager; import java.sql.SQLException; import java.util.ArrayList; @@ -36,7 +38,9 @@ public class ArchivalServiceImpl implements ArchivalService { private ArchivalDAO archivalDAO; private DataDeletionDAO dataDeletionDAO; - private static int ITERATION_COUNT = 10000; + private static int ITERATION_COUNT = + DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration() + .getArchivalTaskConfiguration().getBatchSize(); private String[] NOT_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"}; private String[] NOT_PENDING_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED", "IN_PROGRESS"}; @@ -49,95 +53,172 @@ public class ArchivalServiceImpl implements ArchivalService { @Override public void archiveTransactionalRecords() throws ArchivalException { + List allOperations; + List pendingAndIPOperations; try { ArchivalSourceDAOFactory.openConnection(); ArchivalDestinationDAOFactory.openConnection(); - List allOperations = archivalDAO.getAllOperations(); - List pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations(); + if (log.isDebugEnabled()) { + log.debug("Fetching All Operations"); + } + allOperations = archivalDAO.getAllOperations(); + + if (log.isDebugEnabled()) { + log.debug("Fetching All Pending Operations"); + } + pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations(); + + } catch (ArchivalDAOException e) { +// rollbackTransactions(); + String msg = "Rollback the get all operations and get all pending operations"; + log.error(msg, e); + throw new ArchivalException(msg, e); + } catch (SQLException e) { + String msg = "An error occurred while connecting to the archival database"; + log.error(msg, e); + throw new ArchivalException(msg, e); + } finally { + ArchivalSourceDAOFactory.closeConnection(); + ArchivalDestinationDAOFactory.closeConnection(); + } - log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() + - " P&IP Operations"); - //Get the diff of operations - Set setA = new HashSet<>(allOperations); - Set setB = new HashSet<>(pendingAndIPOperations); - setA.removeAll(setB); + log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() + + " P&IP Operations"); + //Get the diff of operations + Set setA = new HashSet<>(allOperations); + Set setB = new HashSet<>(pendingAndIPOperations); + setA.removeAll(setB); - List candidates = new ArrayList<>(); - candidates.addAll(setA); + List candidates = new ArrayList<>(); + candidates.addAll(setA); - int total = candidates.size(); - int batches = calculateNumberOfBatches(total); - int batchSize = ITERATION_COUNT; + int total = candidates.size(); + int batches = calculateNumberOfBatches(total); + int batchSize = ITERATION_COUNT; + if (log.isDebugEnabled()) { + log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done."); + log.debug(batchSize + " is the batch size"); + } + + for (int i = 1; i <= batches; i++) { + int startIdx = batchSize * (i - 1); + int endIdx = batchSize * i; + if (i == batches) { + endIdx = startIdx + (total % batchSize); + } if (log.isDebugEnabled()) { - log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done."); + log.debug("\n\n############ Iterating over batch " + i + "[" + + startIdx + "," + endIdx + "] #######"); } + List subList = candidates.subList(startIdx, endIdx); - beginTransactions(); - for (int i = 1; i <= batches; i++) { - int startIdx = batchSize * (i - 1); - int endIdx = batchSize * i; - if (i == batches) { - endIdx = startIdx + (total % batchSize); + if (log.isDebugEnabled()) { + log.debug("SubList size is: " + subList.size()); + if (subList.size() > 0) { + log.debug("First Element is: " + subList.get(0)); + log.debug("Last Element is: " + subList.get(subList.size() - 1)); } - if(log.isDebugEnabled()) { - log.debug("\n\n############ Iterating over batch " + i + "[" + - startIdx + "," + endIdx + "] #######"); + } + + if (log.isDebugEnabled()) { + for (Integer val : subList) { + if (log.isDebugEnabled()) { + log.debug("Sub List Element: " + val); + } } - List subList = candidates.subList(startIdx, endIdx); + } + + try { + beginTransactions(); prepareTempTable(subList); + commitTransactions(); + } catch (Exception e) { + rollbackTransactions(); + String msg = "Error occurred while preparing the operations."; + log.error(msg, e); + throw new ArchivalException(msg, e); + } finally { + ArchivalSourceDAOFactory.closeConnection(); + ArchivalDestinationDAOFactory.closeConnection(); + } + + List operationResponses = null; + List notification = null; + List commandOperations = null; + List profileOperations = null; + List enrollmentMapping = null; + List operations = null; + + try { + openConnection(); + operationResponses = archivalDAO.selectOperationResponses(); + notification = archivalDAO.selectNotifications(); + commandOperations = archivalDAO.selectCommandOperations(); + profileOperations = archivalDAO.selectProfileOperations(); + enrollmentMapping = archivalDAO.selectEnrolmentMappings(); + operations = archivalDAO.selectOperations(); + + } catch (Exception e) { + String msg = "Error occurred while retrieving data."; + log.error(msg, e); + throw new ArchivalException(msg, e); + } finally { + closeConnection(); + } + + try { + beginTransactions(); //Purge the largest table, DM_DEVICE_OPERATION_RESPONSE if (log.isDebugEnabled()) { - log.debug("## Purging operation responses"); + log.debug("## Archiving operation responses"); } - archivalDAO.moveOperationResponses(); + archivalDAO.moveOperationResponses(operationResponses); //Purge the notifications table, DM_NOTIFICATION if (log.isDebugEnabled()) { - log.debug("## Purging notifications"); + log.debug("## Archiving notifications"); } - archivalDAO.moveNotifications(); + archivalDAO.moveNotifications(notification); //Purge the command operations table, DM_COMMAND_OPERATION if (log.isDebugEnabled()) { - log.debug("## Purging command operations"); + log.debug("## Archiving command operations"); } - archivalDAO.moveCommandOperations(); + archivalDAO.moveCommandOperations(commandOperations); //Purge the profile operation table, DM_PROFILE_OPERATION if (log.isDebugEnabled()) { - log.debug("## Purging profile operations"); + log.debug("## Archiving profile operations"); } - archivalDAO.moveProfileOperations(); - - //Purge the config operation table, DM_CONFIG_OPERATION - if (log.isDebugEnabled()) { - log.debug("## Purging config operations"); - } - archivalDAO.moveConfigOperations(); + archivalDAO.moveProfileOperations(profileOperations); //Purge the enrolment mappings table, DM_ENROLMENT_OP_MAPPING if (log.isDebugEnabled()) { - log.debug("## Purging enrolment mappings"); + log.debug("## Archiving enrolment mappings"); } - archivalDAO.moveEnrolmentMappings(); + archivalDAO.moveEnrolmentMappings(enrollmentMapping); //Finally, purge the operations table, DM_OPERATION if (log.isDebugEnabled()) { - log.debug("## Purging operations"); + log.debug("## Archiving operations"); } - archivalDAO.moveOperations(); + archivalDAO.moveOperations(operations); + commitTransactions(); + if (log.isDebugEnabled()) { + log.debug("End of Iteration : " + i); + } + } catch (ArchivalDAOException e) { + rollbackTransactions(); + String msg = "Error occurred while trying to archive data to the six tables"; + log.error(msg, e); + throw new ArchivalException(msg, e); + } finally { + ArchivalSourceDAOFactory.closeConnection(); + ArchivalDestinationDAOFactory.closeConnection(); } - commitTransactions(); - } catch (ArchivalDAOException e) { - rollbackTransactions(); - throw new ArchivalException("An error occurred while data archival", e); - } catch (SQLException e) { - throw new ArchivalException("An error occurred while connecting to the archival database.", e); - } finally { - ArchivalSourceDAOFactory.closeConnection(); - ArchivalDestinationDAOFactory.closeConnection(); + } } @@ -147,6 +228,9 @@ public class ArchivalServiceImpl implements ArchivalService { log.debug("## Truncating the temporary table"); } archivalDAO.truncateOperationIDsForArchival(); + if (log.isDebugEnabled()) { + log.debug("## Inserting into the temporary table"); + } archivalDAO.copyOperationIDsForArchival(subList); } @@ -160,6 +244,28 @@ public class ArchivalServiceImpl implements ArchivalService { } } + private void openConnection() throws ArchivalException { + try { + ArchivalSourceDAOFactory.openConnection(); + } catch (SQLException e) { + String msg = "An error occurred during opening connection"; + log.error(msg, e); + throw new ArchivalException(msg, e); + } + + } + + private void closeConnection() throws ArchivalException { + try { + ArchivalSourceDAOFactory.closeConnection(); + } catch (Exception e) { + String msg = "An error occurred during opening connection"; + log.error(msg, e); + throw new ArchivalException(msg, e); + } + + } + private void commitTransactions() { ArchivalSourceDAOFactory.commitTransaction(); ArchivalDestinationDAOFactory.commitTransaction(); diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveCommandOperation.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveCommandOperation.java new file mode 100644 index 0000000000..f044a21fa4 --- /dev/null +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveCommandOperation.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.wso2.carbon.device.mgt.core.archival.beans; + +public class ArchiveCommandOperation { + + + private int operationId; + private int enabled; + + public int getOperationId() { + return operationId; + } + + public void setOperationId(int operationId) { + this.operationId = operationId; + } + + public int getEnabled() { + return enabled; + } + + public void setEnabled(int enabled) { + this.enabled = enabled; + } +} + diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveEnrolmentOperationMap.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveEnrolmentOperationMap.java new file mode 100644 index 0000000000..c85b893f78 --- /dev/null +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveEnrolmentOperationMap.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.wso2.carbon.device.mgt.core.archival.beans; + +public class ArchiveEnrolmentOperationMap { + + + private int id; + private int enrolmentId; + private int operationId; + private String status; + private int createdTimestamp; + private int updatedTimestamp; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public int getEnrolmentId() { + return enrolmentId; + } + + public void setEnrolmentId(int enrolmentId) { + this.enrolmentId = enrolmentId; + } + + public int getOperationId() { + return operationId; + } + + public void setOperationId(int operationId) { + this.operationId = operationId; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public int getCreatedTimestamp() { + return createdTimestamp; + } + + public void setCreatedTimestamp(int createdTimestamp) { + this.createdTimestamp = createdTimestamp; + } + + public int getUpdatedTimestamp() { + return updatedTimestamp; + } + + public void setUpdatedTimestamp(int updatedTimestamp) { + this.updatedTimestamp = updatedTimestamp; + } +} + diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveNotification.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveNotification.java new file mode 100644 index 0000000000..7c0ea9b044 --- /dev/null +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveNotification.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.wso2.carbon.device.mgt.core.archival.beans; + +public class ArchiveNotification { + + + private int notificationId; + private int deviceId; + private int operationId; + private int tenantId; + private String status; + private String description; + + public int getNotificationId() { + return notificationId; + } + + public void setNotificationId(int notificationId) { + this.notificationId = notificationId; + } + + public int getDeviceId() { + return deviceId; + } + + public void setDeviceId(int deviceId) { + this.deviceId = deviceId; + } + + public int getOperationId() { + return operationId; + } + + public void setOperationId(int operationId) { + this.operationId = operationId; + } + + public int getTenantId() { + return tenantId; + } + + public void setTenantId(int tenantId) { + this.tenantId = tenantId; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } +} + diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveOperation.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveOperation.java new file mode 100644 index 0000000000..d62fcddceb --- /dev/null +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveOperation.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.wso2.carbon.device.mgt.core.archival.beans; + +import java.sql.Timestamp; + +public class ArchiveOperation { + + private int id; + private String type; + private Timestamp createdTimeStamp; + private Timestamp recievedTimeStamp; + private String operationCode; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Timestamp getCreatedTimeStamp() { + return createdTimeStamp; + } + + public void setCreatedTimeStamp(Timestamp createdTimeStamp) { + this.createdTimeStamp = createdTimeStamp; + } + + public Timestamp getRecievedTimeStamp() { + return recievedTimeStamp; + } + + public void setRecievedTimeStamp(Timestamp recievedTimeStamp) { + this.recievedTimeStamp = recievedTimeStamp; + } + + public String getOperationCode() { + return operationCode; + } + + public void setOperationCode(String operationCode) { + this.operationCode = operationCode; + } +} + diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveOperationResponse.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveOperationResponse.java new file mode 100644 index 0000000000..ce4712382c --- /dev/null +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveOperationResponse.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.wso2.carbon.device.mgt.core.archival.beans; + +import java.sql.Timestamp; + +public class ArchiveOperationResponse { + + + private int id; + private int enrolmentId; + private int operationId; + private int enOpMapId; + private Object operationResponse; + private Timestamp receivedTimeStamp; + + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public int getEnrolmentId() { + return enrolmentId; + } + + public void setEnrolmentId(int enrolmentId) { + this.enrolmentId = enrolmentId; + } + + public int getOperationId() { + return operationId; + } + + public void setOperationId(int operationId) { + this.operationId = operationId; + } + + public int getEnOpMapId() { + return enOpMapId; + } + + public void setEnOpMapId(int enOpMapId) { + this.enOpMapId = enOpMapId; + } + + public Object getOperationResponse() { + return operationResponse; + } + + public void setOperationResponse(Object operationResponse) { + this.operationResponse = operationResponse; + } + + public Timestamp getReceivedTimeStamp() { + return receivedTimeStamp; + } + + public void setReceivedTimeStamp(Timestamp receivedTimeStamp) { + this.receivedTimeStamp = receivedTimeStamp; + } +} + diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveProfileOperation.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveProfileOperation.java new file mode 100644 index 0000000000..add85dc262 --- /dev/null +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/beans/ArchiveProfileOperation.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.wso2.carbon.device.mgt.core.archival.beans; + +public class ArchiveProfileOperation { + + + private int operationId; + private int enabled; + private Object operationDetails; + + public int getOperationId() { + return operationId; + } + + public void setOperationId(int operationId) { + this.operationId = operationId; + } + + public int getEnabled() { + return enabled; + } + + public void setEnabled(int enabled) { + this.enabled = enabled; + } + + public Object getOperationDetails() { + return operationDetails; + } + + public void setOperationDetails(Object operationDetails) { + this.operationDetails = operationDetails; + } +} + diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/ArchivalDAO.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/ArchivalDAO.java index 2c7063741d..a360a8cfe5 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/ArchivalDAO.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/ArchivalDAO.java @@ -18,6 +18,9 @@ package org.wso2.carbon.device.mgt.core.archival.dao; +import org.wso2.carbon.device.mgt.core.archival.beans.*; + +import java.sql.ResultSet; import java.util.List; /** @@ -33,19 +36,29 @@ public interface ArchivalDAO { void copyOperationIDsForArchival(List operationIds) throws ArchivalDAOException; - void moveOperationResponses() throws ArchivalDAOException; + List selectOperationResponses() throws ArchivalDAOException; + + void moveOperationResponses(List rs) throws ArchivalDAOException; + + List selectNotifications() throws ArchivalDAOException; + + void moveNotifications(List rs) throws ArchivalDAOException; + + List selectCommandOperations() throws ArchivalDAOException; + + void moveCommandOperations(List rs) throws ArchivalDAOException; - void moveNotifications() throws ArchivalDAOException; + List selectProfileOperations() throws ArchivalDAOException; - void moveCommandOperations() throws ArchivalDAOException; + void moveProfileOperations(List rs) throws ArchivalDAOException; - void moveProfileOperations() throws ArchivalDAOException; + List selectEnrolmentMappings() throws ArchivalDAOException; - void moveConfigOperations() throws ArchivalDAOException; + void moveEnrolmentMappings(List rs) throws ArchivalDAOException; - void moveEnrolmentMappings() throws ArchivalDAOException; + List selectOperations() throws ArchivalDAOException; - void moveOperations() throws ArchivalDAOException; + void moveOperations(List rs) throws ArchivalDAOException; void truncateOperationIDsForArchival() throws ArchivalDAOException; diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/ArchivalDAOUtil.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/ArchivalDAOUtil.java index 084b815c3d..a630ab8b54 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/ArchivalDAOUtil.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/ArchivalDAOUtil.java @@ -57,4 +57,14 @@ public class ArchivalDAOUtil { } } + + public static void cleanupResultSet(ResultSet rs) { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + log.warn("Error occurred while closing the result set", e); + } + } + } } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/ArchivalDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/ArchivalDAOImpl.java index 73f44bacd6..1a076ec880 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/ArchivalDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/ArchivalDAOImpl.java @@ -20,11 +20,11 @@ package org.wso2.carbon.device.mgt.core.archival.dao.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.device.mgt.core.archival.beans.*; import org.wso2.carbon.device.mgt.core.archival.dao.*; import java.sql.*; -import java.util.ArrayList; -import java.util.List; +import java.util.*; public class ArchivalDAOImpl implements ArchivalDAO { @@ -56,20 +56,25 @@ public class ArchivalDAOImpl implements ArchivalDAO { try { Connection conn = ArchivalSourceDAOFactory.getConnection(); String sql = "SELECT DISTINCT OPERATION_ID FROM DM_ENROLMENT_OP_MAPPING " + - "WHERE CREATED_TIMESTAMP BETWEEN DATE(TIMESTAMPADD(DAY, " + - this.retentionPeriod + ", NOW())) AND NOW()"; + "WHERE CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); + if (log.isDebugEnabled()) { + log.debug("Selected Operation Ids from Enrolment OP Mapping"); + } while (rs.next()) { operationIds.add(rs.getInt("OPERATION_ID")); } } catch (SQLException e) { - throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e); + String msg = "An error occurred while getting a list operation Ids to archive"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { ArchivalDAOUtil.cleanupResources(stmt, rs); } if (log.isDebugEnabled()) { log.debug(operationIds.size() + " operations found for the archival"); + log.debug(operationIds.size() + "[" + operationIds.get(0) + "," + operationIds.get(batchSize - 1) + "]"); } return operationIds; } @@ -82,21 +87,26 @@ public class ArchivalDAOImpl implements ArchivalDAO { try { Connection conn = ArchivalSourceDAOFactory.getConnection(); String sql = "SELECT DISTINCT OPERATION_ID " + - " FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS IN('PENDING', 'IN_PROGRESS') " + - " AND CREATED_TIMESTAMP BETWEEN DATE(TIMESTAMPADD(DAY, " + this.retentionPeriod +", NOW())) " + - "AND NOW()"; + " FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS='PENDING' OR STATUS='IN_PROGRESS' " + + " AND CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); + if (log.isDebugEnabled()) { + log.debug("Selected Pending or In Progress Operation IDs"); + } while (rs.next()) { operationIds.add(rs.getInt("OPERATION_ID")); } } catch (SQLException e) { - throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e); + String msg = "An error occurred while getting a list pending or in progress operation Ids to archive"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { ArchivalDAOUtil.cleanupResources(stmt, rs); } if (log.isDebugEnabled()) { - log.debug(operationIds.size() + " PENDING and IN_PROFRESS operations found for the archival"); + log.debug(operationIds.size() + " operations found for the archival"); + log.debug(operationIds.size() + "[" + operationIds.get(0) + "," + operationIds.get(batchSize - 1) + "]"); } return operationIds; } @@ -123,44 +133,88 @@ public class ArchivalDAOImpl implements ArchivalDAO { log.debug(count + " Records copied to the temporary table."); } } catch (SQLException e) { - throw new ArchivalDAOException("Error while copying operation Ids for archival", e); + String msg = "Error while copying operation Ids for archival"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { ArchivalDAOUtil.cleanupResources(stmt); } } @Override - public void moveOperationResponses() throws ArchivalDAOException { + public List selectOperationResponses() throws ArchivalDAOException { Statement stmt = null; - PreparedStatement stmt2 = null; - Statement stmt3 = null; ResultSet rs = null; + + List operationResponses = new ArrayList<>(); try { Connection conn = ArchivalSourceDAOFactory.getConnection(); - String sql = "SELECT * FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN " + - "(SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + String sql = "SELECT \n" + + " o.ID,\n" + + " o.ENROLMENT_ID,\n" + + " o.OPERATION_ID,\n" + + " o.EN_OP_MAP_ID,\n" + + " o.OPERATION_RESPONSE,\n" + + " o.RECEIVED_TIMESTAMP\n" + + "FROM\n" + + " DM_DEVICE_OPERATION_RESPONSE o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); + while (rs.next()) { + ArchiveOperationResponse rep = new ArchiveOperationResponse(); + rep.setId(rs.getInt("ID")); + rep.setEnrolmentId(rs.getInt("ENROLMENT_ID")); + rep.setOperationId(rs.getInt("OPERATION_ID")); + rep.setOperationResponse(rs.getBytes("OPERATION_RESPONSE")); + rep.setReceivedTimeStamp(rs.getTimestamp("RECEIVED_TIMESTAMP")); + operationResponses.add(rep); + } + + if (log.isDebugEnabled()) { + log.debug("Selecting done for the Operation Response"); + } + + } catch (SQLException e) { + String msg = "Error occurred while archiving the operation responses"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); + } finally { + ArchivalDAOUtil.cleanupResources(stmt, rs); + } + + return operationResponses; + + } + + @Override + public void moveOperationResponses(List archiveOperationResponse) throws ArchivalDAOException { + PreparedStatement stmt2 = null; + Statement stmt3 = null; + try { + Connection conn = ArchivalSourceDAOFactory.getConnection(); + + Connection conn2 = ArchivalDestinationDAOFactory.getConnection(); - sql = "INSERT INTO DM_DEVICE_OPERATION_RESPONSE_ARCH VALUES(?, ?, ?, ?, ?,?,?)"; + String sql = "INSERT INTO DM_DEVICE_OPERATION_RESPONSE_ARCH VALUES(?, ?, ?, ?, ?,?)"; stmt2 = conn2.prepareStatement(sql); int count = 0; - while (rs.next()) { - stmt2.setInt(1, rs.getInt("ID")); - stmt2.setInt(2, rs.getInt("ENROLMENT_ID")); - stmt2.setInt(3, rs.getInt("OPERATION_ID")); - stmt2.setInt(4, rs.getInt("EN_OP_MAP_ID")); - stmt2.setBytes(5, rs.getBytes("OPERATION_RESPONSE")); - stmt2.setTimestamp(6, rs.getTimestamp("RECEIVED_TIMESTAMP")); - stmt2.setTimestamp(7, this.currentTimestamp); + for (ArchiveOperationResponse rs : archiveOperationResponse) { + stmt2.setInt(1, rs.getId()); + stmt2.setInt(2, rs.getEnrolmentId()); + stmt2.setInt(3, rs.getOperationId()); + stmt2.setBytes(4, (byte[]) rs.getOperationResponse()); + stmt2.setTimestamp(5, rs.getReceivedTimeStamp()); + stmt2.setTimestamp(6, this.currentTimestamp); stmt2.addBatch(); if (++count % batchSize == 0) { stmt2.executeBatch(); if (log.isDebugEnabled()) { - log.debug("Executing batch " + count); + log.debug("Executing Operation Responses batch " + count); } } } @@ -169,259 +223,382 @@ public class ArchivalDAOImpl implements ArchivalDAO { log.debug(count + " [OPERATION_RESPONSES] Records copied to the archival table. Starting deletion"); } //try the deletion now - sql = "DELETE FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN (" + - " SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + sql = "DELETE o.* FROM DM_DEVICE_OPERATION_RESPONSE o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" + + "WHERE\n" + + " o.OPERATION_ID = da.ID;"; stmt3 = conn.createStatement(); int affected = stmt3.executeUpdate(sql); if (log.isDebugEnabled()) { log.debug(affected + " Rows deleted"); } } catch (SQLException e) { - throw new ArchivalDAOException("Error occurred while moving operations ", e); + String msg = "Error occurred while archiving the operation responses"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { - ArchivalDAOUtil.cleanupResources(stmt, rs); ArchivalDAOUtil.cleanupResources(stmt2); ArchivalDAOUtil.cleanupResources(stmt3); } } @Override - public void moveNotifications() throws ArchivalDAOException { + public List selectNotifications() throws ArchivalDAOException { + Statement stmt = null; - PreparedStatement stmt2 = null; - Statement stmt3 = null; ResultSet rs = null; + List notifications = new ArrayList<>(); try { Connection conn = ArchivalSourceDAOFactory.getConnection(); - String sql = "SELECT * FROM DM_NOTIFICATION WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + String sql = "SELECT \n" + + " o.NOTIFICATION_ID,\n" + + " o.DEVICE_ID,\n" + + " o.OPERATION_ID,\n" + + " o.TENANT_ID,\n" + + " o.STATUS,\n" + + " o.DESCRIPTION\n" + + "FROM\n" + + " DM_NOTIFICATION o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); -// ArchivalDestinationDAOFactory.beginTransaction(); + while (rs.next()) { + + ArchiveNotification note = new ArchiveNotification(); + note.setNotificationId(rs.getInt("NOTIFICATION_ID")); + note.setDeviceId(rs.getInt("DEVICE_ID")); + note.setOperationId(rs.getInt("OPERATION_ID")); + note.setTenantId(rs.getInt("TENANT_ID")); + note.setStatus(rs.getString("STATUS")); + note.setDescription(rs.getString("DESCRIPTION")); + notifications.add(note); + } + + if (log.isDebugEnabled()) { + log.debug("Selecting done for the Notification"); + } + } catch (SQLException e) { + String msg = "Error occurred while archiving the notifications"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); + } finally { + ArchivalDAOUtil.cleanupResources(stmt, rs); + } + return notifications; + } + + + @Override + public void moveNotifications(List archiveNotifications) throws ArchivalDAOException { + Statement stmt = null; + PreparedStatement stmt2 = null; + Statement stmt3 = null; + try { + Connection conn = ArchivalSourceDAOFactory.getConnection(); Connection conn2 = ArchivalDestinationDAOFactory.getConnection(); - sql = "INSERT INTO DM_NOTIFICATION_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)"; + String sql = "INSERT INTO DM_NOTIFICATION_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)"; stmt2 = conn2.prepareStatement(sql); int count = 0; - while (rs.next()) { - stmt2.setInt(1, rs.getInt("NOTIFICATION_ID")); - stmt2.setInt(2, rs.getInt("DEVICE_ID")); - stmt2.setInt(3, rs.getInt("OPERATION_ID")); - stmt2.setInt(4, rs.getInt("TENANT_ID")); - stmt2.setString(5, rs.getString("STATUS")); - stmt2.setString(6, rs.getString("DESCRIPTION")); +// while (rs.next()) { + for (ArchiveNotification rs : archiveNotifications) { + stmt2.setInt(1, rs.getNotificationId()); + stmt2.setInt(2, rs.getDeviceId()); + stmt2.setInt(3, rs.getOperationId()); + stmt2.setInt(4, rs.getTenantId()); + stmt2.setString(5, rs.getStatus()); + stmt2.setString(6, rs.getDescription()); stmt2.setTimestamp(7, this.currentTimestamp); stmt2.addBatch(); if (++count % batchSize == 0) { stmt2.executeBatch(); + if (log.isDebugEnabled()) { + log.debug("Executing Notifications batch " + count); + } } } stmt2.executeBatch(); -// ArchivalDestinationDAOFactory.commitTransaction(); if (log.isDebugEnabled()) { log.debug(count + " [NOTIFICATIONS] Records copied to the archival table. Starting deletion"); } - sql = "DELETE FROM DM_NOTIFICATION" + - " WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + sql = "DELETE o.* FROM DM_NOTIFICATION o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" + + "WHERE\n" + + " o.OPERATION_ID = da.ID;"; stmt3 = conn.createStatement(); int affected = stmt3.executeUpdate(sql); if (log.isDebugEnabled()) { log.debug(affected + " Rows deleted"); } } catch (SQLException e) { - throw new ArchivalDAOException("Error occurred while moving notifications ", e); + String msg = "Error occurred while archiving the notifications"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { - ArchivalDAOUtil.cleanupResources(stmt, rs); ArchivalDAOUtil.cleanupResources(stmt2); ArchivalDAOUtil.cleanupResources(stmt3); } } @Override - public void moveCommandOperations() throws ArchivalDAOException { + public List selectCommandOperations() throws ArchivalDAOException { Statement stmt = null; - PreparedStatement stmt2 = null; - Statement stmt3 = null; ResultSet rs = null; + + List commandOperations = new ArrayList<>(); try { Connection conn = ArchivalSourceDAOFactory.getConnection(); - String sql = "SELECT * FROM DM_COMMAND_OPERATION WHERE OPERATION_ID IN " + - "(SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + String sql = "SELECT \n" + + " *\n" + + "FROM\n" + + " DM_COMMAND_OPERATION o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); + while (rs.next()) { + ArchiveCommandOperation op = new ArchiveCommandOperation(); + op.setOperationId(rs.getInt("OPERATION_ID")); + op.setEnabled(rs.getInt("ENABLED")); + + commandOperations.add(op); + } + + if (log.isDebugEnabled()) { + log.debug("Selecting done for the Command Operation"); + } + } catch (SQLException e) { + String msg = "Error occurred while archiving the command operation"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); + } finally { + ArchivalDAOUtil.cleanupResources(stmt, rs); + } + return commandOperations; + } + + @Override + public void moveCommandOperations(List commandOperations) throws ArchivalDAOException { + Statement stmt = null; + PreparedStatement stmt2 = null; + Statement stmt3 = null; + try { + Connection conn = ArchivalSourceDAOFactory.getConnection(); Connection conn2 = ArchivalDestinationDAOFactory.getConnection(); - sql = "INSERT INTO DM_COMMAND_OPERATION_ARCH VALUES(?,?,?)"; + String sql = "INSERT INTO DM_COMMAND_OPERATION_ARCH VALUES(?,?,?)"; stmt2 = conn2.prepareStatement(sql); int count = 0; - while (rs.next()) { - stmt2.setInt(1, rs.getInt("OPERATION_ID")); - stmt2.setInt(2, rs.getInt("ENABLED")); + for (ArchiveCommandOperation rs : commandOperations) { + stmt2.setInt(1, rs.getOperationId()); + stmt2.setInt(2, rs.getEnabled()); stmt2.setTimestamp(3, this.currentTimestamp); stmt2.addBatch(); if (++count % batchSize == 0) { stmt2.executeBatch(); + if (log.isDebugEnabled()) { + log.debug("Executing Command Operations batch " + count); + } } } stmt2.executeBatch(); if (log.isDebugEnabled()) { log.debug(count + " [COMMAND_OPERATION] Records copied to the archival table. Starting deletion"); } - sql = "DELETE FROM DM_COMMAND_OPERATION" + - " WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + sql = "DELETE o.* FROM DM_COMMAND_OPERATION o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" + + "WHERE\n" + + " o.OPERATION_ID = da.ID;"; stmt3 = conn.createStatement(); int affected = stmt3.executeUpdate(sql); if (log.isDebugEnabled()) { log.debug(affected + " Rows deleted"); } } catch (SQLException e) { - throw new ArchivalDAOException("Error occurred while moving command operations", e); + String msg = "Error occurred while archiving the command operation"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { - ArchivalDAOUtil.cleanupResources(stmt, rs); ArchivalDAOUtil.cleanupResources(stmt2); ArchivalDAOUtil.cleanupResources(stmt3); } } @Override - public void moveProfileOperations() throws ArchivalDAOException { + public List selectProfileOperations() throws ArchivalDAOException { Statement stmt = null; - PreparedStatement stmt2 = null; - Statement stmt3 = null; ResultSet rs = null; + List profileOperations = new ArrayList<>(); try { Connection conn = ArchivalSourceDAOFactory.getConnection(); - String sql = "SELECT * FROM DM_PROFILE_OPERATION WHERE OPERATION_ID IN " + - "(SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + String sql = "SELECT \n" + + " *\n" + + "FROM\n" + + " DM_PROFILE_OPERATION o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); - Connection conn2 = ArchivalDestinationDAOFactory.getConnection(); - - sql = "INSERT INTO DM_PROFILE_OPERATION_ARCH VALUES(?, ?, ?, ?)"; - stmt2 = conn2.prepareStatement(sql); - - int count = 0; while (rs.next()) { - stmt2.setInt(1, rs.getInt("OPERATION_ID")); - stmt2.setInt(2, rs.getInt("ENABLED")); - stmt2.setBytes(3, rs.getBytes("OPERATION_DETAILS")); - stmt2.setTimestamp(4,this.currentTimestamp ); - stmt2.addBatch(); + ArchiveProfileOperation op = new ArchiveProfileOperation(); + + op.setOperationId(rs.getInt("OPERATION_ID")); + op.setEnabled(rs.getInt("ENABLED")); + op.setOperationDetails(rs.getBytes("OPERATION_DETAILS")); + profileOperations.add(op); - if (++count % batchSize == 0) { - stmt2.executeBatch(); - } - } - stmt2.executeBatch(); - if (log.isDebugEnabled()) { - log.debug(count + " [PROFILE_OPERATION] Records copied to the archival table. Starting deletion"); } - sql = "DELETE FROM DM_PROFILE_OPERATION" + - " WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; - stmt3 = conn.createStatement(); - int affected = stmt3.executeUpdate(sql); + if (log.isDebugEnabled()) { - log.debug(affected + " Rows deleted"); + log.debug("Selecting done for the Profile Operation"); } } catch (SQLException e) { - throw new ArchivalDAOException("Error occurred while moving profile operations", e); + String msg = "Error occurred while archiving the profile operation"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { ArchivalDAOUtil.cleanupResources(stmt, rs); - ArchivalDAOUtil.cleanupResources(stmt2); - ArchivalDAOUtil.cleanupResources(stmt3); } + return profileOperations; } @Override - public void moveConfigOperations() throws ArchivalDAOException { + public void moveProfileOperations(List profileOperations) throws ArchivalDAOException { Statement stmt = null; PreparedStatement stmt2 = null; Statement stmt3 = null; - ResultSet rs = null; try { Connection conn = ArchivalSourceDAOFactory.getConnection(); - String sql = "SELECT * FROM DM_CONFIG_OPERATION WHERE OPERATION_ID IN " + - "(SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; - stmt = this.createMemoryEfficientStatement(conn); - rs = stmt.executeQuery(sql); Connection conn2 = ArchivalDestinationDAOFactory.getConnection(); - sql = "INSERT INTO DM_CONFIG_OPERATION_ARCH VALUES(?, ?, ?, ?)"; + String sql = "INSERT INTO DM_PROFILE_OPERATION_ARCH VALUES(?, ?, ?, ?)"; stmt2 = conn2.prepareStatement(sql); int count = 0; - while (rs.next()) { - stmt2.setInt(1, rs.getInt("OPERATION_ID")); - stmt2.setBytes(2, rs.getBytes("OPERATION_CONFIG")); - stmt2.setInt(3, rs.getInt("ENABLED")); - stmt2.setTimestamp(4,this.currentTimestamp ); + for (ArchiveProfileOperation rs : profileOperations) { + stmt2.setInt(1, rs.getOperationId()); + stmt2.setInt(2, rs.getEnabled()); + stmt2.setBytes(3, (byte[]) rs.getOperationDetails()); + stmt2.setTimestamp(4, this.currentTimestamp); stmt2.addBatch(); if (++count % batchSize == 0) { stmt2.executeBatch(); + if (log.isDebugEnabled()) { + log.debug("Executing Profile Operations batch " + count); + } } } stmt2.executeBatch(); if (log.isDebugEnabled()) { - log.debug(count + " [CONFIG_OPERATION] Records copied to the archival table. Starting deletion"); + log.debug(count + " [PROFILE_OPERATION] Records copied to the archival table. Starting deletion"); } - sql = "DELETE FROM DM_CONFIG_OPERATION" + - " WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + sql = "DELETE o.* FROM DM_PROFILE_OPERATION o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" + + "WHERE\n" + + " o.OPERATION_ID = da.ID;"; stmt3 = conn.createStatement(); int affected = stmt3.executeUpdate(sql); if (log.isDebugEnabled()) { log.debug(affected + " Rows deleted"); } } catch (SQLException e) { - throw new ArchivalDAOException("Error occurred while moving config operations", e); + String msg = "Error occurred while archiving the profile operation"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { - ArchivalDAOUtil.cleanupResources(stmt, rs); ArchivalDAOUtil.cleanupResources(stmt2); ArchivalDAOUtil.cleanupResources(stmt3); } } @Override - public void moveEnrolmentMappings() throws ArchivalDAOException { + public List selectEnrolmentMappings() throws ArchivalDAOException { Statement stmt = null; - PreparedStatement stmt2 = null; - Statement stmt3 = null; ResultSet rs = null; + List operationMaps = new ArrayList<>(); try { Connection conn = ArchivalSourceDAOFactory.getConnection(); - String sql = "SELECT * FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN " + - "(SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + String sql = "SELECT \n" + + " o.ID,\n" + + " o.ENROLMENT_ID,\n" + + " o.OPERATION_ID,\n" + + " o.STATUS,\n" + + " o.CREATED_TIMESTAMP,\n" + + " o.UPDATED_TIMESTAMP\n" + + "FROM\n" + + " DM_ENROLMENT_OP_MAPPING o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); + while (rs.next()) { + + ArchiveEnrolmentOperationMap eom = new ArchiveEnrolmentOperationMap(); + eom.setId(rs.getInt("ID")); + eom.setEnrolmentId(rs.getInt("ENROLMENT_ID")); + eom.setOperationId(rs.getInt("OPERATION_ID")); + eom.setStatus(rs.getString("STATUS")); + eom.setCreatedTimestamp(rs.getInt("CREATED_TIMESTAMP")); + eom.setUpdatedTimestamp(rs.getInt("UPDATED_TIMESTAMP")); + operationMaps.add(eom); + } + + if (log.isDebugEnabled()) { + log.debug("Selecting done for the Enrolment OP Mapping"); + } + } catch (SQLException e) { + String msg = "Error occurred while archiving the enrolment op mappings"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); + } finally { + ArchivalDAOUtil.cleanupResources(stmt, rs); + } + + return operationMaps; + } + + @Override + public void moveEnrolmentMappings(List operationMaps) throws ArchivalDAOException { + Statement stmt = null; + PreparedStatement stmt2 = null; + Statement stmt3 = null; + try { + Connection conn = ArchivalSourceDAOFactory.getConnection(); Connection conn2 = ArchivalDestinationDAOFactory.getConnection(); - sql = "INSERT INTO DM_ENROLMENT_OP_MAPPING_ARCH VALUES(?, ?, ?, ?, ?, ?, ?,?)"; + String sql = "INSERT INTO DM_ENROLMENT_OP_MAPPING_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)"; stmt2 = conn2.prepareStatement(sql); int count = 0; - while (rs.next()) { - stmt2.setInt(1, rs.getInt("ID")); - stmt2.setInt(2, rs.getInt("ENROLMENT_ID")); - stmt2.setInt(3, rs.getInt("OPERATION_ID")); - stmt2.setString(4, rs.getString("STATUS")); - stmt2.setString(5, rs.getString("PUSH_NOTIFICATION_STATUS")); - stmt2.setInt(6, rs.getInt("CREATED_TIMESTAMP")); - stmt2.setInt(7, rs.getInt("UPDATED_TIMESTAMP")); - stmt2.setTimestamp(8, this.currentTimestamp); + for (ArchiveEnrolmentOperationMap rs : operationMaps) { + stmt2.setInt(1, rs.getId()); + stmt2.setInt(2, rs.getEnrolmentId()); + stmt2.setInt(3, rs.getOperationId()); + stmt2.setString(4, rs.getStatus()); + stmt2.setInt(5, rs.getCreatedTimestamp()); + stmt2.setInt(6, rs.getUpdatedTimestamp()); + stmt2.setTimestamp(7, this.currentTimestamp); stmt2.addBatch(); if (++count % batchSize == 0) { stmt2.executeBatch(); if (log.isDebugEnabled()) { - log.debug("Executing batch " + count); + log.debug("Executing Enrolment Mappings batch " + count); } } } @@ -429,67 +606,119 @@ public class ArchivalDAOImpl implements ArchivalDAO { if (log.isDebugEnabled()) { log.debug(count + " [ENROLMENT_OP_MAPPING] Records copied to the archival table. Starting deletion"); } - sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN (" + - "SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + sql = "DELETE o.* FROM DM_ENROLMENT_OP_MAPPING o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" + + "WHERE\n" + + " o.OPERATION_ID = da.ID;"; stmt3 = conn.createStatement(); int affected = stmt3.executeUpdate(sql); if (log.isDebugEnabled()) { log.debug(affected + " Rows deleted"); } } catch (SQLException e) { - throw new ArchivalDAOException("Error occurred while moving enrolment mappings", e); + String msg = "Error occurred while archiving the enrolment op mappings"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { - ArchivalDAOUtil.cleanupResources(stmt, rs); ArchivalDAOUtil.cleanupResources(stmt2); ArchivalDAOUtil.cleanupResources(stmt3); } } @Override - public void moveOperations() throws ArchivalDAOException { + public List selectOperations() throws ArchivalDAOException { Statement stmt = null; - PreparedStatement stmt2 = null; - Statement stmt3 = null; ResultSet rs = null; + List operations = new ArrayList<>(); try { Connection conn = ArchivalSourceDAOFactory.getConnection(); - String sql = "SELECT * FROM DM_OPERATION WHERE ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + String sql = "SELECT \n" + + " o.ID,\n" + + " o.TYPE,\n" + + " o.CREATED_TIMESTAMP,\n" + + " o.RECEIVED_TIMESTAMP,\n" + + " o.OPERATION_CODE\n" + + "FROM\n" + + " DM_OPERATION o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.ID = da.ID;"; stmt = this.createMemoryEfficientStatement(conn); rs = stmt.executeQuery(sql); + while (rs.next()) { + + ArchiveOperation op = new ArchiveOperation(); + op.setId(rs.getInt("ID")); + op.setType(rs.getString("TYPE")); + op.setCreatedTimeStamp(rs.getTimestamp("CREATED_TIMESTAMP")); + op.setRecievedTimeStamp(rs.getTimestamp("RECEIVED_TIMESTAMP")); + op.setOperationCode(rs.getString("OPERATION_CODE")); + + operations.add(op); + + } + + if (log.isDebugEnabled()) { + log.debug("Selecting done for the Operation"); + } + } catch (SQLException e) { + String msg = "Error occurred while archiving the operations"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); + } finally { + ArchivalDAOUtil.cleanupResources(stmt, rs); + } + return operations; + } + + @Override + public void moveOperations(List operations) throws ArchivalDAOException { + Statement stmt = null; + PreparedStatement stmt2 = null; + Statement stmt3 = null; + try { + Connection conn = ArchivalSourceDAOFactory.getConnection(); Connection conn2 = ArchivalDestinationDAOFactory.getConnection(); - sql = "INSERT INTO DM_OPERATION_ARCH VALUES(?, ?, ?, ?, ?, ?)"; + String sql = "INSERT INTO DM_OPERATION_ARCH VALUES(?, ?, ?, ?, ?, ?)"; stmt2 = conn2.prepareStatement(sql); int count = 0; - while (rs.next()) { - stmt2.setInt(1, rs.getInt("ID")); - stmt2.setString(2, rs.getString("TYPE")); - stmt2.setTimestamp(3, rs.getTimestamp("CREATED_TIMESTAMP")); - stmt2.setTimestamp(4, rs.getTimestamp("RECEIVED_TIMESTAMP")); - stmt2.setString(5, rs.getString("OPERATION_CODE")); + for (ArchiveOperation rs : operations) { + stmt2.setInt(1, rs.getId()); + stmt2.setString(2, rs.getType()); + stmt2.setTimestamp(3, rs.getCreatedTimeStamp()); + stmt2.setTimestamp(4, rs.getRecievedTimeStamp()); + stmt2.setString(5, rs.getOperationCode()); stmt2.setTimestamp(6, this.currentTimestamp); stmt2.addBatch(); if (++count % batchSize == 0) { stmt2.executeBatch(); + if (log.isDebugEnabled()) { + log.debug("Final Execution of Operations batch " + count); + } } } stmt2.executeBatch(); if (log.isDebugEnabled()) { log.debug(count + " [OPERATIONS] Records copied to the archival table. Starting deletion"); } - sql = "DELETE FROM DM_OPERATION WHERE ID IN (" + - "SELECT ID FROM DM_ARCHIVED_OPERATIONS)"; + sql = "DELETE o.* FROM DM_OPERATION o\n" + + " INNER JOIN\n" + + " DM_ARCHIVED_OPERATIONS da ON o.ID = da.ID \n" + + "WHERE\n" + + " o.ID = da.ID;"; stmt3 = conn.createStatement(); int affected = stmt3.executeUpdate(sql); if (log.isDebugEnabled()) { log.debug(affected + " Rows deleted"); } } catch (SQLException e) { - throw new ArchivalDAOException("Error occurred while moving operations", e); + String msg = "Error occurred while archiving the operations"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { - ArchivalDAOUtil.cleanupResources(stmt, rs); ArchivalDAOUtil.cleanupResources(stmt2); ArchivalDAOUtil.cleanupResources(stmt3); } @@ -503,11 +732,13 @@ public class ArchivalDAOImpl implements ArchivalDAO { conn.setAutoCommit(false); String sql = "TRUNCATE DM_ARCHIVED_OPERATIONS"; stmt = conn.prepareStatement(sql); - stmt.addBatch(); - stmt.executeBatch(); + stmt.executeUpdate(); + conn.commit(); } catch (SQLException e) { - throw new ArchivalDAOException("Error occurred while truncating operation Ids", e); + String msg = "Error occurred while truncating operation Ids"; + log.error(msg, e); + throw new ArchivalDAOException(msg, e); } finally { ArchivalDAOUtil.cleanupResources(stmt); } @@ -519,4 +750,59 @@ public class ArchivalDAOImpl implements ArchivalDAO { return stmt; } -} + private String buildWhereClause(String[] statuses) { + StringBuilder whereClause = new StringBuilder("WHERE "); + for (int i = 0; i < statuses.length; i++) { + whereClause.append("STATUS ='"); + whereClause.append(statuses[i]); + whereClause.append("' "); + if (i != (statuses.length - 1)) + whereClause.append(" OR "); + } + return whereClause.toString(); + } + + private void copyOperationIDsForArchival() throws ArchivalDAOException { + PreparedStatement stmt = null; + Statement createStmt = null; + try { + Connection conn = ArchivalSourceDAOFactory.getConnection(); +// conn.setAutoCommit(false); +// String sql = "INSERT INTO DM_ARCHIVED_OPERATIONS(ID,CREATED_TIMESTAMP)" + +// " SELECT DISTINCT op.ID as OPERATION_ID, NOW()" + +// " FROM DM_ENROLMENT_OP_MAPPING AS opm" + +// " LEFT JOIN DM_OPERATION AS op ON opm.OPERATION_ID = op.ID" + +// " WHERE opm.STATUS='ERROR' OR opm.STATUS='COMPLETED'" + +// " AND op.RECEIVED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL ? DAY);"; +// stmt = conn.prepareStatement(sql); +// stmt.setInt(1, this.retentionPeriod); +// stmt.addBatch(); +// stmt.executeBatch(); +// conn.commit(); + + //Create the temporary table first +// String sql = "CREATE TEMPORARY TABLE DM_ARCHIVED_OPERATIONS (ID INTEGER NOT NULL," + +// " CREATED_TIMESTAMP TIMESTAMP NOT NULL, PRIMARY KEY (ID))" ; +// createStmt = conn.createStatement(); +// createStmt.execute(sql); +// if(log.isDebugEnabled()) { +// log.debug("Temporary table DM_ARCHIVED_OPERATIONS has been created "); +// } + //Copy eligible operations into DM_ARCHIVED_OPERATIONS + String sql = "INSERT INTO DM_ARCHIVED_OPERATIONS(ID,CREATED_TIMESTAMP)" + + " SELECT DISTINCT OPERATION_ID, NOW()" + + " FROM DM_ENROLMENT_OP_MAPPING" + + " WHERE STATUS='ERROR' OR STATUS='COMPLETED' OR STATUS='REPEATED'" + + " AND CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL ? DAY)"; + stmt = conn.prepareStatement(sql); + stmt.setInt(1, this.retentionPeriod); + int affected = stmt.executeUpdate(); + log.info(affected + " Eligible operations found for archival"); + } catch (SQLException e) { + throw new ArchivalDAOException("Error occurred while copying operation Ids for archival", e); + } finally { + ArchivalDAOUtil.cleanupResources(stmt); + ArchivalDAOUtil.cleanupResources(createStmt); + } + } +} \ No newline at end of file diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/DataDeletionDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/DataDeletionDAOImpl.java index dd900a178d..35f339f1bf 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/DataDeletionDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/archival/dao/impl/DataDeletionDAOImpl.java @@ -21,10 +21,11 @@ package org.wso2.carbon.device.mgt.core.archival.dao.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.device.mgt.core.archival.dao.DataDeletionDAO; import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOException; import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOUtil; import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDestinationDAOFactory; -import org.wso2.carbon.device.mgt.core.archival.dao.DataDeletionDAO; +import org.wso2.carbon.device.mgt.core.task.impl.ArchivedDataDeletionTask; import java.sql.Connection; import java.sql.PreparedStatement; @@ -52,8 +53,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO { "WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)"; stmt = conn.prepareStatement(sql); stmt.setInt(1, this.retentionPeriod); - stmt.addBatch(); - stmt.executeBatch(); + stmt.executeUpdate(); conn.commit(); } catch (SQLException e) { throw new ArchivalDAOException("Error occurred while deleting operation responses", e); @@ -72,8 +72,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO { " WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)"; stmt = conn.prepareStatement(sql); stmt.setInt(1, this.retentionPeriod); - stmt.addBatch(); - stmt.executeBatch(); + stmt.executeUpdate(); conn.commit(); } catch (SQLException e) { throw new ArchivalDAOException("Error occurred while deleting notifications", e); @@ -92,8 +91,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO { " WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)"; stmt = conn.prepareStatement(sql); stmt.setInt(1, this.retentionPeriod); - stmt.addBatch(); - stmt.executeBatch(); + stmt.executeUpdate(); conn.commit(); } catch (SQLException e) { throw new ArchivalDAOException("Error occurred while deleting command operations", e); @@ -112,8 +110,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO { " WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)"; stmt = conn.prepareStatement(sql); stmt.setInt(1, this.retentionPeriod); - stmt.addBatch(); - stmt.executeBatch(); + stmt.executeUpdate(); conn.commit(); } catch (SQLException e) { throw new ArchivalDAOException("Error occurred while deleting profile operations", e); @@ -131,8 +128,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO { String sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)"; stmt = conn.prepareStatement(sql); stmt.setInt(1, this.retentionPeriod); - stmt.addBatch(); - stmt.executeBatch(); + stmt.executeUpdate(); conn.commit(); } catch (SQLException e) { throw new ArchivalDAOException("Error occurred while deleting enrolment mappings", e); @@ -150,8 +146,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO { String sql = "DELETE FROM DM_OPERATION_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)"; stmt = conn.prepareStatement(sql); stmt.setInt(1, this.retentionPeriod); - stmt.addBatch(); - stmt.executeBatch(); + stmt.executeUpdate(); conn.commit(); } catch (SQLException e) { throw new ArchivalDAOException("Error occurred while deleting operations", e); diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/ArchivedDataDeletionTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/ArchivedDataDeletionTask.java index 4be24e4f3d..3fff58debe 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/ArchivedDataDeletionTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/ArchivedDataDeletionTask.java @@ -55,7 +55,7 @@ public class ArchivedDataDeletionTask implements Task { log.error("An error occurred while executing DataDeletionTask", e); } long endTime = System.nanoTime(); - long difference = (endTime - startTime) / 1000000 * 1000; + long difference = (endTime - startTime) / (1000000 * 1000); log.info("DataDeletionTask completed. Total execution time: " + difference + " seconds"); } }