Merge pull request #1249 from abhishekdesilva/master

Archival and Purging Task Improvement
merge-requests/16/head
Geeth 6 years ago committed by GitHub
commit d67e9a806b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -22,7 +22,9 @@ package org.wso2.carbon.device.mgt.core.archival;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.TransactionManagementException;
import org.wso2.carbon.device.mgt.core.archival.beans.*;
import org.wso2.carbon.device.mgt.core.archival.dao.*;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import java.sql.SQLException;
import java.util.ArrayList;
@ -36,7 +38,9 @@ public class ArchivalServiceImpl implements ArchivalService {
private ArchivalDAO archivalDAO;
private DataDeletionDAO dataDeletionDAO;
private static int ITERATION_COUNT = 10000;
private static int ITERATION_COUNT =
DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration()
.getArchivalTaskConfiguration().getBatchSize();
private String[] NOT_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"};
private String[] NOT_PENDING_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED", "IN_PROGRESS"};
@ -49,95 +53,172 @@ public class ArchivalServiceImpl implements ArchivalService {
@Override
public void archiveTransactionalRecords() throws ArchivalException {
List<Integer> allOperations;
List<Integer> pendingAndIPOperations;
try {
ArchivalSourceDAOFactory.openConnection();
ArchivalDestinationDAOFactory.openConnection();
List<Integer> allOperations = archivalDAO.getAllOperations();
List<Integer> pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations();
if (log.isDebugEnabled()) {
log.debug("Fetching All Operations");
}
allOperations = archivalDAO.getAllOperations();
if (log.isDebugEnabled()) {
log.debug("Fetching All Pending Operations");
}
pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations();
} catch (ArchivalDAOException e) {
// rollbackTransactions();
String msg = "Rollback the get all operations and get all pending operations";
log.error(msg, e);
throw new ArchivalException(msg, e);
} catch (SQLException e) {
String msg = "An error occurred while connecting to the archival database";
log.error(msg, e);
throw new ArchivalException(msg, e);
} finally {
ArchivalSourceDAOFactory.closeConnection();
ArchivalDestinationDAOFactory.closeConnection();
}
log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() +
" P&IP Operations");
//Get the diff of operations
Set<Integer> setA = new HashSet<>(allOperations);
Set<Integer> setB = new HashSet<>(pendingAndIPOperations);
setA.removeAll(setB);
log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() +
" P&IP Operations");
//Get the diff of operations
Set<Integer> setA = new HashSet<>(allOperations);
Set<Integer> setB = new HashSet<>(pendingAndIPOperations);
setA.removeAll(setB);
List<Integer> candidates = new ArrayList<>();
candidates.addAll(setA);
List<Integer> candidates = new ArrayList<>();
candidates.addAll(setA);
int total = candidates.size();
int batches = calculateNumberOfBatches(total);
int batchSize = ITERATION_COUNT;
int total = candidates.size();
int batches = calculateNumberOfBatches(total);
int batchSize = ITERATION_COUNT;
if (log.isDebugEnabled()) {
log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done.");
log.debug(batchSize + " is the batch size");
}
for (int i = 1; i <= batches; i++) {
int startIdx = batchSize * (i - 1);
int endIdx = batchSize * i;
if (i == batches) {
endIdx = startIdx + (total % batchSize);
}
if (log.isDebugEnabled()) {
log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done.");
log.debug("\n\n############ Iterating over batch " + i + "[" +
startIdx + "," + endIdx + "] #######");
}
List<Integer> subList = candidates.subList(startIdx, endIdx);
beginTransactions();
for (int i = 1; i <= batches; i++) {
int startIdx = batchSize * (i - 1);
int endIdx = batchSize * i;
if (i == batches) {
endIdx = startIdx + (total % batchSize);
if (log.isDebugEnabled()) {
log.debug("SubList size is: " + subList.size());
if (subList.size() > 0) {
log.debug("First Element is: " + subList.get(0));
log.debug("Last Element is: " + subList.get(subList.size() - 1));
}
if(log.isDebugEnabled()) {
log.debug("\n\n############ Iterating over batch " + i + "[" +
startIdx + "," + endIdx + "] #######");
}
if (log.isDebugEnabled()) {
for (Integer val : subList) {
if (log.isDebugEnabled()) {
log.debug("Sub List Element: " + val);
}
}
List<Integer> subList = candidates.subList(startIdx, endIdx);
}
try {
beginTransactions();
prepareTempTable(subList);
commitTransactions();
} catch (Exception e) {
rollbackTransactions();
String msg = "Error occurred while preparing the operations.";
log.error(msg, e);
throw new ArchivalException(msg, e);
} finally {
ArchivalSourceDAOFactory.closeConnection();
ArchivalDestinationDAOFactory.closeConnection();
}
List<ArchiveOperationResponse> operationResponses = null;
List<ArchiveNotification> notification = null;
List<ArchiveCommandOperation> commandOperations = null;
List<ArchiveProfileOperation> profileOperations = null;
List<ArchiveEnrolmentOperationMap> enrollmentMapping = null;
List<ArchiveOperation> operations = null;
try {
openConnection();
operationResponses = archivalDAO.selectOperationResponses();
notification = archivalDAO.selectNotifications();
commandOperations = archivalDAO.selectCommandOperations();
profileOperations = archivalDAO.selectProfileOperations();
enrollmentMapping = archivalDAO.selectEnrolmentMappings();
operations = archivalDAO.selectOperations();
} catch (Exception e) {
String msg = "Error occurred while retrieving data.";
log.error(msg, e);
throw new ArchivalException(msg, e);
} finally {
closeConnection();
}
try {
beginTransactions();
//Purge the largest table, DM_DEVICE_OPERATION_RESPONSE
if (log.isDebugEnabled()) {
log.debug("## Purging operation responses");
log.debug("## Archiving operation responses");
}
archivalDAO.moveOperationResponses();
archivalDAO.moveOperationResponses(operationResponses);
//Purge the notifications table, DM_NOTIFICATION
if (log.isDebugEnabled()) {
log.debug("## Purging notifications");
log.debug("## Archiving notifications");
}
archivalDAO.moveNotifications();
archivalDAO.moveNotifications(notification);
//Purge the command operations table, DM_COMMAND_OPERATION
if (log.isDebugEnabled()) {
log.debug("## Purging command operations");
log.debug("## Archiving command operations");
}
archivalDAO.moveCommandOperations();
archivalDAO.moveCommandOperations(commandOperations);
//Purge the profile operation table, DM_PROFILE_OPERATION
if (log.isDebugEnabled()) {
log.debug("## Purging profile operations");
log.debug("## Archiving profile operations");
}
archivalDAO.moveProfileOperations();
//Purge the config operation table, DM_CONFIG_OPERATION
if (log.isDebugEnabled()) {
log.debug("## Purging config operations");
}
archivalDAO.moveConfigOperations();
archivalDAO.moveProfileOperations(profileOperations);
//Purge the enrolment mappings table, DM_ENROLMENT_OP_MAPPING
if (log.isDebugEnabled()) {
log.debug("## Purging enrolment mappings");
log.debug("## Archiving enrolment mappings");
}
archivalDAO.moveEnrolmentMappings();
archivalDAO.moveEnrolmentMappings(enrollmentMapping);
//Finally, purge the operations table, DM_OPERATION
if (log.isDebugEnabled()) {
log.debug("## Purging operations");
log.debug("## Archiving operations");
}
archivalDAO.moveOperations();
archivalDAO.moveOperations(operations);
commitTransactions();
if (log.isDebugEnabled()) {
log.debug("End of Iteration : " + i);
}
} catch (ArchivalDAOException e) {
rollbackTransactions();
String msg = "Error occurred while trying to archive data to the six tables";
log.error(msg, e);
throw new ArchivalException(msg, e);
} finally {
ArchivalSourceDAOFactory.closeConnection();
ArchivalDestinationDAOFactory.closeConnection();
}
commitTransactions();
} catch (ArchivalDAOException e) {
rollbackTransactions();
throw new ArchivalException("An error occurred while data archival", e);
} catch (SQLException e) {
throw new ArchivalException("An error occurred while connecting to the archival database.", e);
} finally {
ArchivalSourceDAOFactory.closeConnection();
ArchivalDestinationDAOFactory.closeConnection();
}
}
@ -147,6 +228,9 @@ public class ArchivalServiceImpl implements ArchivalService {
log.debug("## Truncating the temporary table");
}
archivalDAO.truncateOperationIDsForArchival();
if (log.isDebugEnabled()) {
log.debug("## Inserting into the temporary table");
}
archivalDAO.copyOperationIDsForArchival(subList);
}
@ -160,6 +244,28 @@ public class ArchivalServiceImpl implements ArchivalService {
}
}
private void openConnection() throws ArchivalException {
try {
ArchivalSourceDAOFactory.openConnection();
} catch (SQLException e) {
String msg = "An error occurred during opening connection";
log.error(msg, e);
throw new ArchivalException(msg, e);
}
}
private void closeConnection() throws ArchivalException {
try {
ArchivalSourceDAOFactory.closeConnection();
} catch (Exception e) {
String msg = "An error occurred during opening connection";
log.error(msg, e);
throw new ArchivalException(msg, e);
}
}
private void commitTransactions() {
ArchivalSourceDAOFactory.commitTransaction();
ArchivalDestinationDAOFactory.commitTransaction();

@ -0,0 +1,44 @@
/*
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.beans;
public class ArchiveCommandOperation {
private int operationId;
private int enabled;
public int getOperationId() {
return operationId;
}
public void setOperationId(int operationId) {
this.operationId = operationId;
}
public int getEnabled() {
return enabled;
}
public void setEnabled(int enabled) {
this.enabled = enabled;
}
}

@ -0,0 +1,80 @@
/*
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.beans;
public class ArchiveEnrolmentOperationMap {
private int id;
private int enrolmentId;
private int operationId;
private String status;
private int createdTimestamp;
private int updatedTimestamp;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getEnrolmentId() {
return enrolmentId;
}
public void setEnrolmentId(int enrolmentId) {
this.enrolmentId = enrolmentId;
}
public int getOperationId() {
return operationId;
}
public void setOperationId(int operationId) {
this.operationId = operationId;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public int getCreatedTimestamp() {
return createdTimestamp;
}
public void setCreatedTimestamp(int createdTimestamp) {
this.createdTimestamp = createdTimestamp;
}
public int getUpdatedTimestamp() {
return updatedTimestamp;
}
public void setUpdatedTimestamp(int updatedTimestamp) {
this.updatedTimestamp = updatedTimestamp;
}
}

@ -0,0 +1,80 @@
/*
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.beans;
public class ArchiveNotification {
private int notificationId;
private int deviceId;
private int operationId;
private int tenantId;
private String status;
private String description;
public int getNotificationId() {
return notificationId;
}
public void setNotificationId(int notificationId) {
this.notificationId = notificationId;
}
public int getDeviceId() {
return deviceId;
}
public void setDeviceId(int deviceId) {
this.deviceId = deviceId;
}
public int getOperationId() {
return operationId;
}
public void setOperationId(int operationId) {
this.operationId = operationId;
}
public int getTenantId() {
return tenantId;
}
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
}

@ -0,0 +1,72 @@
/*
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.beans;
import java.sql.Timestamp;
public class ArchiveOperation {
private int id;
private String type;
private Timestamp createdTimeStamp;
private Timestamp recievedTimeStamp;
private String operationCode;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Timestamp getCreatedTimeStamp() {
return createdTimeStamp;
}
public void setCreatedTimeStamp(Timestamp createdTimeStamp) {
this.createdTimeStamp = createdTimeStamp;
}
public Timestamp getRecievedTimeStamp() {
return recievedTimeStamp;
}
public void setRecievedTimeStamp(Timestamp recievedTimeStamp) {
this.recievedTimeStamp = recievedTimeStamp;
}
public String getOperationCode() {
return operationCode;
}
public void setOperationCode(String operationCode) {
this.operationCode = operationCode;
}
}

@ -0,0 +1,83 @@
/*
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.beans;
import java.sql.Timestamp;
public class ArchiveOperationResponse {
private int id;
private int enrolmentId;
private int operationId;
private int enOpMapId;
private Object operationResponse;
private Timestamp receivedTimeStamp;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getEnrolmentId() {
return enrolmentId;
}
public void setEnrolmentId(int enrolmentId) {
this.enrolmentId = enrolmentId;
}
public int getOperationId() {
return operationId;
}
public void setOperationId(int operationId) {
this.operationId = operationId;
}
public int getEnOpMapId() {
return enOpMapId;
}
public void setEnOpMapId(int enOpMapId) {
this.enOpMapId = enOpMapId;
}
public Object getOperationResponse() {
return operationResponse;
}
public void setOperationResponse(Object operationResponse) {
this.operationResponse = operationResponse;
}
public Timestamp getReceivedTimeStamp() {
return receivedTimeStamp;
}
public void setReceivedTimeStamp(Timestamp receivedTimeStamp) {
this.receivedTimeStamp = receivedTimeStamp;
}
}

@ -0,0 +1,53 @@
/*
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.beans;
public class ArchiveProfileOperation {
private int operationId;
private int enabled;
private Object operationDetails;
public int getOperationId() {
return operationId;
}
public void setOperationId(int operationId) {
this.operationId = operationId;
}
public int getEnabled() {
return enabled;
}
public void setEnabled(int enabled) {
this.enabled = enabled;
}
public Object getOperationDetails() {
return operationDetails;
}
public void setOperationDetails(Object operationDetails) {
this.operationDetails = operationDetails;
}
}

@ -18,6 +18,9 @@
package org.wso2.carbon.device.mgt.core.archival.dao;
import org.wso2.carbon.device.mgt.core.archival.beans.*;
import java.sql.ResultSet;
import java.util.List;
/**
@ -33,19 +36,29 @@ public interface ArchivalDAO {
void copyOperationIDsForArchival(List<Integer> operationIds) throws ArchivalDAOException;
void moveOperationResponses() throws ArchivalDAOException;
List<ArchiveOperationResponse> selectOperationResponses() throws ArchivalDAOException;
void moveOperationResponses(List<ArchiveOperationResponse> rs) throws ArchivalDAOException;
List<ArchiveNotification> selectNotifications() throws ArchivalDAOException;
void moveNotifications(List<ArchiveNotification> rs) throws ArchivalDAOException;
List<ArchiveCommandOperation> selectCommandOperations() throws ArchivalDAOException;
void moveCommandOperations(List<ArchiveCommandOperation> rs) throws ArchivalDAOException;
void moveNotifications() throws ArchivalDAOException;
List<ArchiveProfileOperation> selectProfileOperations() throws ArchivalDAOException;
void moveCommandOperations() throws ArchivalDAOException;
void moveProfileOperations(List<ArchiveProfileOperation> rs) throws ArchivalDAOException;
void moveProfileOperations() throws ArchivalDAOException;
List<ArchiveEnrolmentOperationMap> selectEnrolmentMappings() throws ArchivalDAOException;
void moveConfigOperations() throws ArchivalDAOException;
void moveEnrolmentMappings(List<ArchiveEnrolmentOperationMap> rs) throws ArchivalDAOException;
void moveEnrolmentMappings() throws ArchivalDAOException;
List<ArchiveOperation> selectOperations() throws ArchivalDAOException;
void moveOperations() throws ArchivalDAOException;
void moveOperations(List<ArchiveOperation> rs) throws ArchivalDAOException;
void truncateOperationIDsForArchival() throws ArchivalDAOException;

@ -57,4 +57,14 @@ public class ArchivalDAOUtil {
}
}
public static void cleanupResultSet(ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
log.warn("Error occurred while closing the result set", e);
}
}
}
}

@ -20,11 +20,11 @@ package org.wso2.carbon.device.mgt.core.archival.dao.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.core.archival.beans.*;
import org.wso2.carbon.device.mgt.core.archival.dao.*;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
public class ArchivalDAOImpl implements ArchivalDAO {
@ -56,20 +56,25 @@ public class ArchivalDAOImpl implements ArchivalDAO {
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT DISTINCT OPERATION_ID FROM DM_ENROLMENT_OP_MAPPING " +
"WHERE CREATED_TIMESTAMP BETWEEN DATE(TIMESTAMPADD(DAY, " +
this.retentionPeriod + ", NOW())) AND NOW()";
"WHERE CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
if (log.isDebugEnabled()) {
log.debug("Selected Operation Ids from Enrolment OP Mapping");
}
while (rs.next()) {
operationIds.add(rs.getInt("OPERATION_ID"));
}
} catch (SQLException e) {
throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e);
String msg = "An error occurred while getting a list operation Ids to archive";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
}
if (log.isDebugEnabled()) {
log.debug(operationIds.size() + " operations found for the archival");
log.debug(operationIds.size() + "[" + operationIds.get(0) + "," + operationIds.get(batchSize - 1) + "]");
}
return operationIds;
}
@ -82,21 +87,26 @@ public class ArchivalDAOImpl implements ArchivalDAO {
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT DISTINCT OPERATION_ID " +
" FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS IN('PENDING', 'IN_PROGRESS') " +
" AND CREATED_TIMESTAMP BETWEEN DATE(TIMESTAMPADD(DAY, " + this.retentionPeriod +", NOW())) " +
"AND NOW()";
" FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS='PENDING' OR STATUS='IN_PROGRESS' " +
" AND CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
if (log.isDebugEnabled()) {
log.debug("Selected Pending or In Progress Operation IDs");
}
while (rs.next()) {
operationIds.add(rs.getInt("OPERATION_ID"));
}
} catch (SQLException e) {
throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e);
String msg = "An error occurred while getting a list pending or in progress operation Ids to archive";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
}
if (log.isDebugEnabled()) {
log.debug(operationIds.size() + " PENDING and IN_PROFRESS operations found for the archival");
log.debug(operationIds.size() + " operations found for the archival");
log.debug(operationIds.size() + "[" + operationIds.get(0) + "," + operationIds.get(batchSize - 1) + "]");
}
return operationIds;
}
@ -123,44 +133,88 @@ public class ArchivalDAOImpl implements ArchivalDAO {
log.debug(count + " Records copied to the temporary table.");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error while copying operation Ids for archival", e);
String msg = "Error while copying operation Ids for archival";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
}
@Override
public void moveOperationResponses() throws ArchivalDAOException {
public List<ArchiveOperationResponse> selectOperationResponses() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
List<ArchiveOperationResponse> operationResponses = new ArrayList<>();
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN " +
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
String sql = "SELECT \n" +
" o.ID,\n" +
" o.ENROLMENT_ID,\n" +
" o.OPERATION_ID,\n" +
" o.EN_OP_MAP_ID,\n" +
" o.OPERATION_RESPONSE,\n" +
" o.RECEIVED_TIMESTAMP\n" +
"FROM\n" +
" DM_DEVICE_OPERATION_RESPONSE o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
while (rs.next()) {
ArchiveOperationResponse rep = new ArchiveOperationResponse();
rep.setId(rs.getInt("ID"));
rep.setEnrolmentId(rs.getInt("ENROLMENT_ID"));
rep.setOperationId(rs.getInt("OPERATION_ID"));
rep.setOperationResponse(rs.getBytes("OPERATION_RESPONSE"));
rep.setReceivedTimeStamp(rs.getTimestamp("RECEIVED_TIMESTAMP"));
operationResponses.add(rep);
}
if (log.isDebugEnabled()) {
log.debug("Selecting done for the Operation Response");
}
} catch (SQLException e) {
String msg = "Error occurred while archiving the operation responses";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
}
return operationResponses;
}
@Override
public void moveOperationResponses(List<ArchiveOperationResponse> archiveOperationResponse) throws ArchivalDAOException {
PreparedStatement stmt2 = null;
Statement stmt3 = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_DEVICE_OPERATION_RESPONSE_ARCH VALUES(?, ?, ?, ?, ?,?,?)";
String sql = "INSERT INTO DM_DEVICE_OPERATION_RESPONSE_ARCH VALUES(?, ?, ?, ?, ?,?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("ID"));
stmt2.setInt(2, rs.getInt("ENROLMENT_ID"));
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
stmt2.setInt(4, rs.getInt("EN_OP_MAP_ID"));
stmt2.setBytes(5, rs.getBytes("OPERATION_RESPONSE"));
stmt2.setTimestamp(6, rs.getTimestamp("RECEIVED_TIMESTAMP"));
stmt2.setTimestamp(7, this.currentTimestamp);
for (ArchiveOperationResponse rs : archiveOperationResponse) {
stmt2.setInt(1, rs.getId());
stmt2.setInt(2, rs.getEnrolmentId());
stmt2.setInt(3, rs.getOperationId());
stmt2.setBytes(4, (byte[]) rs.getOperationResponse());
stmt2.setTimestamp(5, rs.getReceivedTimeStamp());
stmt2.setTimestamp(6, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug("Executing batch " + count);
log.debug("Executing Operation Responses batch " + count);
}
}
}
@ -169,259 +223,382 @@ public class ArchivalDAOImpl implements ArchivalDAO {
log.debug(count + " [OPERATION_RESPONSES] Records copied to the archival table. Starting deletion");
}
//try the deletion now
sql = "DELETE FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN (" +
" SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
sql = "DELETE o.* FROM DM_DEVICE_OPERATION_RESPONSE o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
"WHERE\n" +
" o.OPERATION_ID = da.ID;";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving operations ", e);
String msg = "Error occurred while archiving the operation responses";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveNotifications() throws ArchivalDAOException {
public List<ArchiveNotification> selectNotifications() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
List<ArchiveNotification> notifications = new ArrayList<>();
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_NOTIFICATION WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
String sql = "SELECT \n" +
" o.NOTIFICATION_ID,\n" +
" o.DEVICE_ID,\n" +
" o.OPERATION_ID,\n" +
" o.TENANT_ID,\n" +
" o.STATUS,\n" +
" o.DESCRIPTION\n" +
"FROM\n" +
" DM_NOTIFICATION o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
// ArchivalDestinationDAOFactory.beginTransaction();
while (rs.next()) {
ArchiveNotification note = new ArchiveNotification();
note.setNotificationId(rs.getInt("NOTIFICATION_ID"));
note.setDeviceId(rs.getInt("DEVICE_ID"));
note.setOperationId(rs.getInt("OPERATION_ID"));
note.setTenantId(rs.getInt("TENANT_ID"));
note.setStatus(rs.getString("STATUS"));
note.setDescription(rs.getString("DESCRIPTION"));
notifications.add(note);
}
if (log.isDebugEnabled()) {
log.debug("Selecting done for the Notification");
}
} catch (SQLException e) {
String msg = "Error occurred while archiving the notifications";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
}
return notifications;
}
@Override
public void moveNotifications(List<ArchiveNotification> archiveNotifications) throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_NOTIFICATION_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)";
String sql = "INSERT INTO DM_NOTIFICATION_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("NOTIFICATION_ID"));
stmt2.setInt(2, rs.getInt("DEVICE_ID"));
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
stmt2.setInt(4, rs.getInt("TENANT_ID"));
stmt2.setString(5, rs.getString("STATUS"));
stmt2.setString(6, rs.getString("DESCRIPTION"));
// while (rs.next()) {
for (ArchiveNotification rs : archiveNotifications) {
stmt2.setInt(1, rs.getNotificationId());
stmt2.setInt(2, rs.getDeviceId());
stmt2.setInt(3, rs.getOperationId());
stmt2.setInt(4, rs.getTenantId());
stmt2.setString(5, rs.getStatus());
stmt2.setString(6, rs.getDescription());
stmt2.setTimestamp(7, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug("Executing Notifications batch " + count);
}
}
}
stmt2.executeBatch();
// ArchivalDestinationDAOFactory.commitTransaction();
if (log.isDebugEnabled()) {
log.debug(count + " [NOTIFICATIONS] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_NOTIFICATION" +
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
sql = "DELETE o.* FROM DM_NOTIFICATION o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
"WHERE\n" +
" o.OPERATION_ID = da.ID;";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving notifications ", e);
String msg = "Error occurred while archiving the notifications";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveCommandOperations() throws ArchivalDAOException {
public List<ArchiveCommandOperation> selectCommandOperations() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
List<ArchiveCommandOperation> commandOperations = new ArrayList<>();
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_COMMAND_OPERATION WHERE OPERATION_ID IN " +
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
String sql = "SELECT \n" +
" *\n" +
"FROM\n" +
" DM_COMMAND_OPERATION o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
while (rs.next()) {
ArchiveCommandOperation op = new ArchiveCommandOperation();
op.setOperationId(rs.getInt("OPERATION_ID"));
op.setEnabled(rs.getInt("ENABLED"));
commandOperations.add(op);
}
if (log.isDebugEnabled()) {
log.debug("Selecting done for the Command Operation");
}
} catch (SQLException e) {
String msg = "Error occurred while archiving the command operation";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
}
return commandOperations;
}
@Override
public void moveCommandOperations(List<ArchiveCommandOperation> commandOperations) throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_COMMAND_OPERATION_ARCH VALUES(?,?,?)";
String sql = "INSERT INTO DM_COMMAND_OPERATION_ARCH VALUES(?,?,?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
stmt2.setInt(2, rs.getInt("ENABLED"));
for (ArchiveCommandOperation rs : commandOperations) {
stmt2.setInt(1, rs.getOperationId());
stmt2.setInt(2, rs.getEnabled());
stmt2.setTimestamp(3, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug("Executing Command Operations batch " + count);
}
}
}
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " [COMMAND_OPERATION] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_COMMAND_OPERATION" +
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
sql = "DELETE o.* FROM DM_COMMAND_OPERATION o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
"WHERE\n" +
" o.OPERATION_ID = da.ID;";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving command operations", e);
String msg = "Error occurred while archiving the command operation";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveProfileOperations() throws ArchivalDAOException {
public List<ArchiveProfileOperation> selectProfileOperations() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
List<ArchiveProfileOperation> profileOperations = new ArrayList<>();
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_PROFILE_OPERATION WHERE OPERATION_ID IN " +
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
String sql = "SELECT \n" +
" *\n" +
"FROM\n" +
" DM_PROFILE_OPERATION o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_PROFILE_OPERATION_ARCH VALUES(?, ?, ?, ?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
stmt2.setInt(2, rs.getInt("ENABLED"));
stmt2.setBytes(3, rs.getBytes("OPERATION_DETAILS"));
stmt2.setTimestamp(4,this.currentTimestamp );
stmt2.addBatch();
ArchiveProfileOperation op = new ArchiveProfileOperation();
op.setOperationId(rs.getInt("OPERATION_ID"));
op.setEnabled(rs.getInt("ENABLED"));
op.setOperationDetails(rs.getBytes("OPERATION_DETAILS"));
profileOperations.add(op);
if (++count % batchSize == 0) {
stmt2.executeBatch();
}
}
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " [PROFILE_OPERATION] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_PROFILE_OPERATION" +
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
log.debug("Selecting done for the Profile Operation");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving profile operations", e);
String msg = "Error occurred while archiving the profile operation";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
return profileOperations;
}
@Override
public void moveConfigOperations() throws ArchivalDAOException {
public void moveProfileOperations(List<ArchiveProfileOperation> profileOperations) throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_CONFIG_OPERATION WHERE OPERATION_ID IN " +
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_CONFIG_OPERATION_ARCH VALUES(?, ?, ?, ?)";
String sql = "INSERT INTO DM_PROFILE_OPERATION_ARCH VALUES(?, ?, ?, ?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
stmt2.setBytes(2, rs.getBytes("OPERATION_CONFIG"));
stmt2.setInt(3, rs.getInt("ENABLED"));
stmt2.setTimestamp(4,this.currentTimestamp );
for (ArchiveProfileOperation rs : profileOperations) {
stmt2.setInt(1, rs.getOperationId());
stmt2.setInt(2, rs.getEnabled());
stmt2.setBytes(3, (byte[]) rs.getOperationDetails());
stmt2.setTimestamp(4, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug("Executing Profile Operations batch " + count);
}
}
}
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " [CONFIG_OPERATION] Records copied to the archival table. Starting deletion");
log.debug(count + " [PROFILE_OPERATION] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_CONFIG_OPERATION" +
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
sql = "DELETE o.* FROM DM_PROFILE_OPERATION o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
"WHERE\n" +
" o.OPERATION_ID = da.ID;";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving config operations", e);
String msg = "Error occurred while archiving the profile operation";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveEnrolmentMappings() throws ArchivalDAOException {
public List<ArchiveEnrolmentOperationMap> selectEnrolmentMappings() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
List<ArchiveEnrolmentOperationMap> operationMaps = new ArrayList<>();
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN " +
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
String sql = "SELECT \n" +
" o.ID,\n" +
" o.ENROLMENT_ID,\n" +
" o.OPERATION_ID,\n" +
" o.STATUS,\n" +
" o.CREATED_TIMESTAMP,\n" +
" o.UPDATED_TIMESTAMP\n" +
"FROM\n" +
" DM_ENROLMENT_OP_MAPPING o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
while (rs.next()) {
ArchiveEnrolmentOperationMap eom = new ArchiveEnrolmentOperationMap();
eom.setId(rs.getInt("ID"));
eom.setEnrolmentId(rs.getInt("ENROLMENT_ID"));
eom.setOperationId(rs.getInt("OPERATION_ID"));
eom.setStatus(rs.getString("STATUS"));
eom.setCreatedTimestamp(rs.getInt("CREATED_TIMESTAMP"));
eom.setUpdatedTimestamp(rs.getInt("UPDATED_TIMESTAMP"));
operationMaps.add(eom);
}
if (log.isDebugEnabled()) {
log.debug("Selecting done for the Enrolment OP Mapping");
}
} catch (SQLException e) {
String msg = "Error occurred while archiving the enrolment op mappings";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
}
return operationMaps;
}
@Override
public void moveEnrolmentMappings(List<ArchiveEnrolmentOperationMap> operationMaps) throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_ENROLMENT_OP_MAPPING_ARCH VALUES(?, ?, ?, ?, ?, ?, ?,?)";
String sql = "INSERT INTO DM_ENROLMENT_OP_MAPPING_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("ID"));
stmt2.setInt(2, rs.getInt("ENROLMENT_ID"));
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
stmt2.setString(4, rs.getString("STATUS"));
stmt2.setString(5, rs.getString("PUSH_NOTIFICATION_STATUS"));
stmt2.setInt(6, rs.getInt("CREATED_TIMESTAMP"));
stmt2.setInt(7, rs.getInt("UPDATED_TIMESTAMP"));
stmt2.setTimestamp(8, this.currentTimestamp);
for (ArchiveEnrolmentOperationMap rs : operationMaps) {
stmt2.setInt(1, rs.getId());
stmt2.setInt(2, rs.getEnrolmentId());
stmt2.setInt(3, rs.getOperationId());
stmt2.setString(4, rs.getStatus());
stmt2.setInt(5, rs.getCreatedTimestamp());
stmt2.setInt(6, rs.getUpdatedTimestamp());
stmt2.setTimestamp(7, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug("Executing batch " + count);
log.debug("Executing Enrolment Mappings batch " + count);
}
}
}
@ -429,67 +606,119 @@ public class ArchivalDAOImpl implements ArchivalDAO {
if (log.isDebugEnabled()) {
log.debug(count + " [ENROLMENT_OP_MAPPING] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN (" +
"SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
sql = "DELETE o.* FROM DM_ENROLMENT_OP_MAPPING o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
"WHERE\n" +
" o.OPERATION_ID = da.ID;";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving enrolment mappings", e);
String msg = "Error occurred while archiving the enrolment op mappings";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveOperations() throws ArchivalDAOException {
public List<ArchiveOperation> selectOperations() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
List<ArchiveOperation> operations = new ArrayList<>();
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_OPERATION WHERE ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
String sql = "SELECT \n" +
" o.ID,\n" +
" o.TYPE,\n" +
" o.CREATED_TIMESTAMP,\n" +
" o.RECEIVED_TIMESTAMP,\n" +
" o.OPERATION_CODE\n" +
"FROM\n" +
" DM_OPERATION o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.ID = da.ID;";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
while (rs.next()) {
ArchiveOperation op = new ArchiveOperation();
op.setId(rs.getInt("ID"));
op.setType(rs.getString("TYPE"));
op.setCreatedTimeStamp(rs.getTimestamp("CREATED_TIMESTAMP"));
op.setRecievedTimeStamp(rs.getTimestamp("RECEIVED_TIMESTAMP"));
op.setOperationCode(rs.getString("OPERATION_CODE"));
operations.add(op);
}
if (log.isDebugEnabled()) {
log.debug("Selecting done for the Operation");
}
} catch (SQLException e) {
String msg = "Error occurred while archiving the operations";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
}
return operations;
}
@Override
public void moveOperations(List<ArchiveOperation> operations) throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_OPERATION_ARCH VALUES(?, ?, ?, ?, ?, ?)";
String sql = "INSERT INTO DM_OPERATION_ARCH VALUES(?, ?, ?, ?, ?, ?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("ID"));
stmt2.setString(2, rs.getString("TYPE"));
stmt2.setTimestamp(3, rs.getTimestamp("CREATED_TIMESTAMP"));
stmt2.setTimestamp(4, rs.getTimestamp("RECEIVED_TIMESTAMP"));
stmt2.setString(5, rs.getString("OPERATION_CODE"));
for (ArchiveOperation rs : operations) {
stmt2.setInt(1, rs.getId());
stmt2.setString(2, rs.getType());
stmt2.setTimestamp(3, rs.getCreatedTimeStamp());
stmt2.setTimestamp(4, rs.getRecievedTimeStamp());
stmt2.setString(5, rs.getOperationCode());
stmt2.setTimestamp(6, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug("Final Execution of Operations batch " + count);
}
}
}
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " [OPERATIONS] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_OPERATION WHERE ID IN (" +
"SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
sql = "DELETE o.* FROM DM_OPERATION o\n" +
" INNER JOIN\n" +
" DM_ARCHIVED_OPERATIONS da ON o.ID = da.ID \n" +
"WHERE\n" +
" o.ID = da.ID;";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving operations", e);
String msg = "Error occurred while archiving the operations";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
@ -503,11 +732,13 @@ public class ArchivalDAOImpl implements ArchivalDAO {
conn.setAutoCommit(false);
String sql = "TRUNCATE DM_ARCHIVED_OPERATIONS";
stmt = conn.prepareStatement(sql);
stmt.addBatch();
stmt.executeBatch();
stmt.executeUpdate();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while truncating operation Ids", e);
String msg = "Error occurred while truncating operation Ids";
log.error(msg, e);
throw new ArchivalDAOException(msg, e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
@ -519,4 +750,59 @@ public class ArchivalDAOImpl implements ArchivalDAO {
return stmt;
}
}
private String buildWhereClause(String[] statuses) {
StringBuilder whereClause = new StringBuilder("WHERE ");
for (int i = 0; i < statuses.length; i++) {
whereClause.append("STATUS ='");
whereClause.append(statuses[i]);
whereClause.append("' ");
if (i != (statuses.length - 1))
whereClause.append(" OR ");
}
return whereClause.toString();
}
private void copyOperationIDsForArchival() throws ArchivalDAOException {
PreparedStatement stmt = null;
Statement createStmt = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
// conn.setAutoCommit(false);
// String sql = "INSERT INTO DM_ARCHIVED_OPERATIONS(ID,CREATED_TIMESTAMP)" +
// " SELECT DISTINCT op.ID as OPERATION_ID, NOW()" +
// " FROM DM_ENROLMENT_OP_MAPPING AS opm" +
// " LEFT JOIN DM_OPERATION AS op ON opm.OPERATION_ID = op.ID" +
// " WHERE opm.STATUS='ERROR' OR opm.STATUS='COMPLETED'" +
// " AND op.RECEIVED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL ? DAY);";
// stmt = conn.prepareStatement(sql);
// stmt.setInt(1, this.retentionPeriod);
// stmt.addBatch();
// stmt.executeBatch();
// conn.commit();
//Create the temporary table first
// String sql = "CREATE TEMPORARY TABLE DM_ARCHIVED_OPERATIONS (ID INTEGER NOT NULL," +
// " CREATED_TIMESTAMP TIMESTAMP NOT NULL, PRIMARY KEY (ID))" ;
// createStmt = conn.createStatement();
// createStmt.execute(sql);
// if(log.isDebugEnabled()) {
// log.debug("Temporary table DM_ARCHIVED_OPERATIONS has been created ");
// }
//Copy eligible operations into DM_ARCHIVED_OPERATIONS
String sql = "INSERT INTO DM_ARCHIVED_OPERATIONS(ID,CREATED_TIMESTAMP)" +
" SELECT DISTINCT OPERATION_ID, NOW()" +
" FROM DM_ENROLMENT_OP_MAPPING" +
" WHERE STATUS='ERROR' OR STATUS='COMPLETED' OR STATUS='REPEATED'" +
" AND CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
int affected = stmt.executeUpdate();
log.info(affected + " Eligible operations found for archival");
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while copying operation Ids for archival", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
ArchivalDAOUtil.cleanupResources(createStmt);
}
}
}

@ -21,10 +21,11 @@ package org.wso2.carbon.device.mgt.core.archival.dao.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.core.archival.dao.DataDeletionDAO;
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOException;
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOUtil;
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDestinationDAOFactory;
import org.wso2.carbon.device.mgt.core.archival.dao.DataDeletionDAO;
import org.wso2.carbon.device.mgt.core.task.impl.ArchivedDataDeletionTask;
import java.sql.Connection;
import java.sql.PreparedStatement;
@ -52,8 +53,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
"WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
stmt.executeUpdate();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting operation responses", e);
@ -72,8 +72,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
stmt.executeUpdate();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting notifications", e);
@ -92,8 +91,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
stmt.executeUpdate();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting command operations", e);
@ -112,8 +110,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
stmt.executeUpdate();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting profile operations", e);
@ -131,8 +128,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
String sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
stmt.executeUpdate();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting enrolment mappings", e);
@ -150,8 +146,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
String sql = "DELETE FROM DM_OPERATION_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
stmt.executeUpdate();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting operations", e);

@ -55,7 +55,7 @@ public class ArchivedDataDeletionTask implements Task {
log.error("An error occurred while executing DataDeletionTask", e);
}
long endTime = System.nanoTime();
long difference = (endTime - startTime) / 1000000 * 1000;
long difference = (endTime - startTime) / (1000000 * 1000);
log.info("DataDeletionTask completed. Total execution time: " + difference + " seconds");
}
}

Loading…
Cancel
Save