Fixes https://github.com/wso2/product-iots/issues/1394. Data archival task implementation to prune CDM transactional tables.
parent
c200316d2e
commit
26a15c28bc
@ -0,0 +1,31 @@
|
|||||||
|
package org.wso2.carbon.device.mgt.core.archival;
|
||||||
|
|
||||||
|
|
||||||
|
public class ArchivalException extends Exception {
|
||||||
|
|
||||||
|
private String errorMessage;
|
||||||
|
|
||||||
|
public String getErrorMessage() {
|
||||||
|
return errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setErrorMessage(String errorMessage) {
|
||||||
|
this.errorMessage = errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArchivalException(String message) {
|
||||||
|
super(message);
|
||||||
|
setErrorMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArchivalException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
setErrorMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ArchivalException(String message, Throwable cause, boolean enableSuppression,
|
||||||
|
boolean writableStackTrace) {
|
||||||
|
super(message, cause, enableSuppression, writableStackTrace);
|
||||||
|
setErrorMessage(message);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,26 @@
|
|||||||
|
package org.wso2.carbon.device.mgt.core.archival;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
public interface ArchivalService {
|
||||||
|
|
||||||
|
void archiveTransactionalRecords() throws ArchivalException;
|
||||||
|
|
||||||
|
void deleteArchivedRecords() throws ArchivalException;
|
||||||
|
}
|
@ -0,0 +1,220 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.archival;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.wso2.carbon.device.mgt.common.TransactionManagementException;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.dao.*;
|
||||||
|
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class ArchivalServiceImpl implements ArchivalService {
|
||||||
|
private static Log log = LogFactory.getLog(ArchivalServiceImpl.class);
|
||||||
|
|
||||||
|
private ArchivalDAO archivalDAO;
|
||||||
|
private DataDeletionDAO dataDeletionDAO;
|
||||||
|
|
||||||
|
private static int ITERATION_COUNT = 10000;
|
||||||
|
|
||||||
|
private String[] NOT_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"};
|
||||||
|
private String[] NOT_PENDING_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED", "IN_PROGRESS"};
|
||||||
|
private String[] NOT_PENDING_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"};
|
||||||
|
|
||||||
|
public ArchivalServiceImpl() {
|
||||||
|
this.archivalDAO = ArchivalSourceDAOFactory.getDataPurgingDAO();
|
||||||
|
this.dataDeletionDAO = ArchivalDestinationDAOFactory.getDataDeletionDAO();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void archiveTransactionalRecords() throws ArchivalException {
|
||||||
|
try {
|
||||||
|
ArchivalSourceDAOFactory.openConnection();
|
||||||
|
ArchivalDestinationDAOFactory.openConnection();
|
||||||
|
|
||||||
|
List<Integer> allOperations = archivalDAO.getAllOperations();
|
||||||
|
List<Integer> pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations();
|
||||||
|
|
||||||
|
log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() +
|
||||||
|
" P&IP Operations");
|
||||||
|
//Get the diff of operations
|
||||||
|
Set<Integer> setA = new HashSet<>(allOperations);
|
||||||
|
Set<Integer> setB = new HashSet<>(pendingAndIPOperations);
|
||||||
|
setA.removeAll(setB);
|
||||||
|
|
||||||
|
List<Integer> candidates = new ArrayList<>();
|
||||||
|
candidates.addAll(setA);
|
||||||
|
|
||||||
|
int total = candidates.size();
|
||||||
|
int batches = calculateNumberOfBatches(total);
|
||||||
|
int batchSize = ITERATION_COUNT;
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done.");
|
||||||
|
}
|
||||||
|
|
||||||
|
beginTransactions();
|
||||||
|
for (int i = 1; i <= batches; i++) {
|
||||||
|
int startIdx = batchSize * (i - 1);
|
||||||
|
int endIdx = batchSize * i;
|
||||||
|
if (i == batches) {
|
||||||
|
endIdx = startIdx + (total % batchSize);
|
||||||
|
}
|
||||||
|
if(log.isDebugEnabled()) {
|
||||||
|
log.debug("\n\n############ Iterating over batch " + i + "[" +
|
||||||
|
startIdx + "," + endIdx + "] #######");
|
||||||
|
}
|
||||||
|
List<Integer> subList = candidates.subList(startIdx, endIdx);
|
||||||
|
prepareTempTable(subList);
|
||||||
|
|
||||||
|
//Purge the largest table, DM_DEVICE_OPERATION_RESPONSE
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Purging operation responses");
|
||||||
|
}
|
||||||
|
archivalDAO.moveOperationResponses();
|
||||||
|
|
||||||
|
//Purge the notifications table, DM_NOTIFICATION
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Purging notifications");
|
||||||
|
}
|
||||||
|
archivalDAO.moveNotifications();
|
||||||
|
|
||||||
|
//Purge the command operations table, DM_COMMAND_OPERATION
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Purging command operations");
|
||||||
|
}
|
||||||
|
archivalDAO.moveCommandOperations();
|
||||||
|
|
||||||
|
//Purge the profile operation table, DM_PROFILE_OPERATION
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Purging profile operations");
|
||||||
|
}
|
||||||
|
archivalDAO.moveProfileOperations();
|
||||||
|
|
||||||
|
//Purge the enrolment mappings table, DM_ENROLMENT_OP_MAPPING
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Purging enrolment mappings");
|
||||||
|
}
|
||||||
|
archivalDAO.moveEnrolmentMappings();
|
||||||
|
|
||||||
|
//Finally, purge the operations table, DM_OPERATION
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Purging operations");
|
||||||
|
}
|
||||||
|
archivalDAO.moveOperations();
|
||||||
|
}
|
||||||
|
commitTransactions();
|
||||||
|
} catch (ArchivalDAOException e) {
|
||||||
|
rollbackTransactions();
|
||||||
|
throw new ArchivalException("An error occurred while data archival", e);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalException("An error occurred while connecting to the archival database.", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalSourceDAOFactory.closeConnection();
|
||||||
|
ArchivalDestinationDAOFactory.closeConnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void prepareTempTable(List<Integer> subList) throws ArchivalDAOException {
|
||||||
|
//Clean up the DM_ARCHIVED_OPERATIONS table
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Truncating the temporary table");
|
||||||
|
}
|
||||||
|
archivalDAO.truncateOperationIDsForArchival();
|
||||||
|
archivalDAO.copyOperationIDsForArchival(subList);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void beginTransactions() throws ArchivalException {
|
||||||
|
try {
|
||||||
|
ArchivalSourceDAOFactory.beginTransaction();
|
||||||
|
ArchivalDestinationDAOFactory.beginTransaction();
|
||||||
|
} catch (TransactionManagementException e) {
|
||||||
|
log.error("An error occurred during starting transactions", e);
|
||||||
|
throw new ArchivalException("An error occurred during starting transactions", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void commitTransactions() {
|
||||||
|
ArchivalSourceDAOFactory.commitTransaction();
|
||||||
|
ArchivalDestinationDAOFactory.commitTransaction();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void rollbackTransactions() {
|
||||||
|
ArchivalSourceDAOFactory.rollbackTransaction();
|
||||||
|
ArchivalDestinationDAOFactory.rollbackTransaction();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int calculateNumberOfBatches(int total) {
|
||||||
|
int batches = 0;
|
||||||
|
int batchSize = ITERATION_COUNT;
|
||||||
|
if ((total % batchSize) > 0) {
|
||||||
|
batches = (total / batchSize) + 1;
|
||||||
|
} else {
|
||||||
|
batches = total / batchSize;
|
||||||
|
}
|
||||||
|
return batches;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteArchivedRecords() throws ArchivalException {
|
||||||
|
try {
|
||||||
|
ArchivalDestinationDAOFactory.openConnection();
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Deleting operation responses");
|
||||||
|
}
|
||||||
|
dataDeletionDAO.deleteOperationResponses();
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Deleting notifications ");
|
||||||
|
}
|
||||||
|
dataDeletionDAO.deleteNotifications();
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Deleting command operations");
|
||||||
|
}
|
||||||
|
dataDeletionDAO.deleteCommandOperations();
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Deleting profile operations ");
|
||||||
|
}
|
||||||
|
dataDeletionDAO.deleteProfileOperations();
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Deleting enrolment mappings ");
|
||||||
|
}
|
||||||
|
dataDeletionDAO.deleteEnrolmentMappings();
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("## Deleting operations ");
|
||||||
|
}
|
||||||
|
dataDeletionDAO.deleteOperations();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalException("An error occurred while initialising data source for archival", e);
|
||||||
|
} catch (ArchivalDAOException e) {
|
||||||
|
log.error("An error occurred while executing DataDeletionTask");
|
||||||
|
} finally {
|
||||||
|
ArchivalDestinationDAOFactory.closeConnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,50 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.archival.dao;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Operations to move data from DM database to archival database
|
||||||
|
*/
|
||||||
|
public interface ArchivalDAO {
|
||||||
|
|
||||||
|
int DEFAULT_BATCH_SIZE = 1000;
|
||||||
|
|
||||||
|
List<Integer> getAllOperations() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
List<Integer> getPendingAndInProgressOperations() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void copyOperationIDsForArchival(List<Integer> operationIds) throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void moveOperationResponses() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void moveNotifications() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void moveCommandOperations() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void moveProfileOperations() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void moveEnrolmentMappings() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void moveOperations() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void truncateOperationIDsForArchival() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,50 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.archival.dao;
|
||||||
|
|
||||||
|
|
||||||
|
public class ArchivalDAOException extends Exception {
|
||||||
|
|
||||||
|
private String errorMessage;
|
||||||
|
|
||||||
|
public String getErrorMessage() {
|
||||||
|
return errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setErrorMessage(String errorMessage) {
|
||||||
|
this.errorMessage = errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArchivalDAOException(String message) {
|
||||||
|
super(message);
|
||||||
|
setErrorMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArchivalDAOException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
setErrorMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ArchivalDAOException(String message, Throwable cause, boolean enableSuppression,
|
||||||
|
boolean writableStackTrace) {
|
||||||
|
super(message, cause, enableSuppression, writableStackTrace);
|
||||||
|
setErrorMessage(message);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,61 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.archival.dao;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
|
||||||
|
public class ArchivalDAOUtil {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(ArchivalDAOUtil.class);
|
||||||
|
|
||||||
|
public static void cleanupResources(Statement stmt, ResultSet rs) {
|
||||||
|
if (rs != null) {
|
||||||
|
try {
|
||||||
|
rs.close();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.warn("Error occurred while closing the result set", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (stmt != null) {
|
||||||
|
try {
|
||||||
|
stmt.close();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.warn("Error occurred while closing the statement", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void cleanupResources(Statement stmt) {
|
||||||
|
if (stmt != null) {
|
||||||
|
try {
|
||||||
|
stmt.close();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.warn("Error occurred while closing the statement", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,150 @@
|
|||||||
|
package org.wso2.carbon.device.mgt.core.archival.dao;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.wso2.carbon.device.mgt.common.IllegalTransactionStateException;
|
||||||
|
import org.wso2.carbon.device.mgt.common.TransactionManagementException;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.dao.impl.DataDeletionDAOImpl;
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.datasource.DataSourceConfig;
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.datasource.JNDILookupDefinition;
|
||||||
|
import org.wso2.carbon.device.mgt.core.dao.util.DeviceManagementDAOUtil;
|
||||||
|
import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOFactory;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.Hashtable;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ArchivalDestinationDAOFactory {
|
||||||
|
private static final Log log = LogFactory.getLog(OperationManagementDAOFactory.class);
|
||||||
|
private static DataSource dataSource;
|
||||||
|
private static String databaseEngine;
|
||||||
|
private static int retentionPeriod;
|
||||||
|
private static ThreadLocal<Connection> currentConnection = new ThreadLocal<Connection>();
|
||||||
|
|
||||||
|
public static DataDeletionDAO getDataDeletionDAO() {
|
||||||
|
return new DataDeletionDAOImpl(DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
|
||||||
|
.getArchivalConfiguration().getArchivalTaskConfiguration()
|
||||||
|
.getPurgingTaskConfiguration().getRetentionPeriod());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void init(DataSource dtSource) {
|
||||||
|
dataSource = dtSource;
|
||||||
|
try {
|
||||||
|
databaseEngine = dataSource.getConnection().getMetaData().getDatabaseProductName();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while retrieving config.datasource connection", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void init(DataSourceConfig config) {
|
||||||
|
dataSource = resolveDataSource(config);
|
||||||
|
try {
|
||||||
|
databaseEngine = dataSource.getConnection().getMetaData().getDatabaseProductName();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while retrieving config.datasource connection", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void beginTransaction() throws TransactionManagementException {
|
||||||
|
try {
|
||||||
|
Connection conn = dataSource.getConnection();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
currentConnection.set(conn);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new TransactionManagementException(
|
||||||
|
"Error occurred while retrieving config.datasource connection", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void openConnection() throws SQLException {
|
||||||
|
currentConnection.set(dataSource.getConnection());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Connection getConnection() throws SQLException {
|
||||||
|
if (currentConnection.get() == null) {
|
||||||
|
throw new IllegalTransactionStateException("No connection is associated with the current transaction. " +
|
||||||
|
"This might have ideally caused by not properly initiating the transaction via " +
|
||||||
|
"'beginTransaction'/'openConnection' methods");
|
||||||
|
}
|
||||||
|
return currentConnection.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void closeConnection() {
|
||||||
|
Connection con = currentConnection.get();
|
||||||
|
if (con != null) {
|
||||||
|
try {
|
||||||
|
con.close();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while close the connection");
|
||||||
|
}
|
||||||
|
currentConnection.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void commitTransaction() {
|
||||||
|
try {
|
||||||
|
Connection conn = currentConnection.get();
|
||||||
|
if (conn != null) {
|
||||||
|
conn.commit();
|
||||||
|
} else {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Datasource connection associated with the current thread is null, hence commit " +
|
||||||
|
"has not been attempted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while committing the transaction", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void rollbackTransaction() {
|
||||||
|
try {
|
||||||
|
Connection conn = currentConnection.get();
|
||||||
|
if (conn != null) {
|
||||||
|
conn.rollback();
|
||||||
|
} else {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Datasource connection associated with the current thread is null, hence rollback " +
|
||||||
|
"has not been attempted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while roll-backing the transaction", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve data source from the data source definition
|
||||||
|
*
|
||||||
|
* @param config data source configuration
|
||||||
|
* @return data source resolved from the data source definition
|
||||||
|
*/
|
||||||
|
private static DataSource resolveDataSource(DataSourceConfig config) {
|
||||||
|
DataSource dataSource = null;
|
||||||
|
if (config == null) {
|
||||||
|
throw new RuntimeException("Device Management Repository data source configuration is null and " +
|
||||||
|
"thus, is not initialized");
|
||||||
|
}
|
||||||
|
JNDILookupDefinition jndiConfig = config.getJndiLookupDefinition();
|
||||||
|
if (jndiConfig != null) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Initializing Device Management Repository data source using the JNDI Lookup Definition");
|
||||||
|
}
|
||||||
|
List<JNDILookupDefinition.JNDIProperty> jndiPropertyList = jndiConfig.getJndiProperties();
|
||||||
|
if (jndiPropertyList != null) {
|
||||||
|
Hashtable<Object, Object> jndiProperties = new Hashtable<Object, Object>();
|
||||||
|
for (JNDILookupDefinition.JNDIProperty prop : jndiPropertyList) {
|
||||||
|
jndiProperties.put(prop.getName(), prop.getValue());
|
||||||
|
}
|
||||||
|
dataSource = DeviceManagementDAOUtil.lookupDataSource(jndiConfig.getJndiName(), jndiProperties);
|
||||||
|
} else {
|
||||||
|
dataSource = DeviceManagementDAOUtil.lookupDataSource(jndiConfig.getJndiName(), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dataSource;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,170 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.archival.dao;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.wso2.carbon.device.mgt.common.IllegalTransactionStateException;
|
||||||
|
import org.wso2.carbon.device.mgt.common.TransactionManagementException;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.dao.impl.ArchivalDAOImpl;
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.archival.ArchivalTaskConfiguration;
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.datasource.DataSourceConfig;
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.datasource.JNDILookupDefinition;
|
||||||
|
import org.wso2.carbon.device.mgt.core.dao.util.DeviceManagementDAOUtil;
|
||||||
|
import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOFactory;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.Hashtable;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ArchivalSourceDAOFactory {
|
||||||
|
private static final Log log = LogFactory.getLog(OperationManagementDAOFactory.class);
|
||||||
|
private static DataSource dataSource;
|
||||||
|
private static String databaseEngine;
|
||||||
|
private static ThreadLocal<Connection> currentConnection = new ThreadLocal<Connection>();
|
||||||
|
|
||||||
|
public static ArchivalDAO getDataPurgingDAO() {
|
||||||
|
ArchivalTaskConfiguration configuration = DeviceConfigurationManager.getInstance()
|
||||||
|
.getDeviceManagementConfig()
|
||||||
|
.getArchivalConfiguration()
|
||||||
|
.getArchivalTaskConfiguration();
|
||||||
|
return new ArchivalDAOImpl(configuration.getRetentionPeriod(), configuration.getBatchSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void init(DataSource dtSource) {
|
||||||
|
dataSource = dtSource;
|
||||||
|
try {
|
||||||
|
databaseEngine = dataSource.getConnection().getMetaData().getDatabaseProductName();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while retrieving config.datasource connection", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void init(DataSourceConfig config) {
|
||||||
|
dataSource = resolveDataSource(config);
|
||||||
|
try {
|
||||||
|
databaseEngine = dataSource.getConnection().getMetaData().getDatabaseProductName();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while retrieving config.datasource connection", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void beginTransaction() throws TransactionManagementException {
|
||||||
|
try {
|
||||||
|
Connection conn = dataSource.getConnection();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
currentConnection.set(conn);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new TransactionManagementException(
|
||||||
|
"Error occurred while retrieving config.datasource connection", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void openConnection() throws SQLException {
|
||||||
|
currentConnection.set(dataSource.getConnection());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Connection getConnection() throws SQLException {
|
||||||
|
if (currentConnection.get() == null) {
|
||||||
|
throw new IllegalTransactionStateException("No connection is associated with the current transaction. " +
|
||||||
|
"This might have ideally caused by not properly initiating the transaction via " +
|
||||||
|
"'beginTransaction'/'openConnection' methods");
|
||||||
|
}
|
||||||
|
return currentConnection.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void closeConnection() {
|
||||||
|
Connection con = currentConnection.get();
|
||||||
|
if (con != null) {
|
||||||
|
try {
|
||||||
|
con.close();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while close the connection");
|
||||||
|
}
|
||||||
|
currentConnection.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void commitTransaction() {
|
||||||
|
try {
|
||||||
|
Connection conn = currentConnection.get();
|
||||||
|
if (conn != null) {
|
||||||
|
conn.commit();
|
||||||
|
} else {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Datasource connection associated with the current thread is null, hence commit " +
|
||||||
|
"has not been attempted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while committing the transaction", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void rollbackTransaction() {
|
||||||
|
try {
|
||||||
|
Connection conn = currentConnection.get();
|
||||||
|
if (conn != null) {
|
||||||
|
conn.rollback();
|
||||||
|
} else {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Datasource connection associated with the current thread is null, hence rollback " +
|
||||||
|
"has not been attempted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Error occurred while roll-backing the transaction", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve data source from the data source definition
|
||||||
|
*
|
||||||
|
* @param config data source configuration
|
||||||
|
* @return data source resolved from the data source definition
|
||||||
|
*/
|
||||||
|
private static DataSource resolveDataSource(DataSourceConfig config) {
|
||||||
|
DataSource dataSource = null;
|
||||||
|
if (config == null) {
|
||||||
|
throw new RuntimeException("Device Management Repository data source configuration is null and " +
|
||||||
|
"thus, is not initialized");
|
||||||
|
}
|
||||||
|
JNDILookupDefinition jndiConfig = config.getJndiLookupDefinition();
|
||||||
|
if (jndiConfig != null) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Initializing Device Management Repository data source using the JNDI Lookup Definition");
|
||||||
|
}
|
||||||
|
List<JNDILookupDefinition.JNDIProperty> jndiPropertyList = jndiConfig.getJndiProperties();
|
||||||
|
if (jndiPropertyList != null) {
|
||||||
|
Hashtable<Object, Object> jndiProperties = new Hashtable<Object, Object>();
|
||||||
|
for (JNDILookupDefinition.JNDIProperty prop : jndiPropertyList) {
|
||||||
|
jndiProperties.put(prop.getName(), prop.getValue());
|
||||||
|
}
|
||||||
|
dataSource = DeviceManagementDAOUtil.lookupDataSource(jndiConfig.getJndiName(), jndiProperties);
|
||||||
|
} else {
|
||||||
|
dataSource = DeviceManagementDAOUtil.lookupDataSource(jndiConfig.getJndiName(), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dataSource;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,39 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.archival.dao;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Operations to permanently delete data from archived database tables
|
||||||
|
*/
|
||||||
|
public interface DataDeletionDAO {
|
||||||
|
|
||||||
|
int DEFAULT_RETENTION_PERIOD = 364;
|
||||||
|
|
||||||
|
void deleteOperationResponses() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void deleteNotifications() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void deleteCommandOperations() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void deleteProfileOperations() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void deleteEnrolmentMappings() throws ArchivalDAOException;
|
||||||
|
|
||||||
|
void deleteOperations() throws ArchivalDAOException;
|
||||||
|
}
|
@ -0,0 +1,471 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.archival.dao.impl;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.dao.*;
|
||||||
|
|
||||||
|
import java.sql.*;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ArchivalDAOImpl implements ArchivalDAO {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(ArchivalDAOImpl.class);
|
||||||
|
|
||||||
|
private int retentionPeriod;
|
||||||
|
private int batchSize = ArchivalDAO.DEFAULT_BATCH_SIZE;
|
||||||
|
private Timestamp currentTimestamp;
|
||||||
|
|
||||||
|
|
||||||
|
public ArchivalDAOImpl(int retentionPeriod) {
|
||||||
|
this.retentionPeriod = retentionPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArchivalDAOImpl(int retentionPeriod, int batchSize) {
|
||||||
|
this.retentionPeriod = retentionPeriod;
|
||||||
|
this.batchSize = batchSize;
|
||||||
|
this.currentTimestamp = new Timestamp(new java.util.Date().getTime());
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Using batch size of " + this.batchSize + " with retention period " + this.retentionPeriod);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Integer> getAllOperations() throws ArchivalDAOException {
|
||||||
|
List<Integer> operationIds = new ArrayList<>();
|
||||||
|
Statement stmt = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
String sql = "SELECT DISTINCT OPERATION_ID FROM DM_ENROLMENT_OP_MAPPING " +
|
||||||
|
"WHERE CREATED_TIMESTAMP BETWEEN DATE_SUB(NOW(), INTERVAL " +
|
||||||
|
this.retentionPeriod + " DAY) AND NOW()";
|
||||||
|
stmt = this.createMemoryEfficientStatement(conn);
|
||||||
|
rs = stmt.executeQuery(sql);
|
||||||
|
while (rs.next()) {
|
||||||
|
operationIds.add(rs.getInt("OPERATION_ID"));
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||||
|
}
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(operationIds.size() + " operations found for the archival");
|
||||||
|
}
|
||||||
|
return operationIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Integer> getPendingAndInProgressOperations() throws ArchivalDAOException {
|
||||||
|
List<Integer> operationIds = new ArrayList<>();
|
||||||
|
Statement stmt = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
String sql = "SELECT DISTINCT OPERATION_ID " +
|
||||||
|
" FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS IN('PENDING', 'IN_PROGRESS') " +
|
||||||
|
" AND CREATED_TIMESTAMP BETWEEN DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod +" DAY) AND NOW()";
|
||||||
|
stmt = this.createMemoryEfficientStatement(conn);
|
||||||
|
rs = stmt.executeQuery(sql);
|
||||||
|
while (rs.next()) {
|
||||||
|
operationIds.add(rs.getInt("OPERATION_ID"));
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||||
|
}
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(operationIds.size() + " PENDING and IN_PROFRESS operations found for the archival");
|
||||||
|
}
|
||||||
|
return operationIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void copyOperationIDsForArchival(List<Integer> operationIds) throws ArchivalDAOException {
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
String sql = "INSERT INTO DM_ARCHIVED_OPERATIONS(ID,CREATED_TIMESTAMP) VALUES (?,NOW())";
|
||||||
|
stmt = conn.prepareStatement(sql);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
for (int i = 0; i < operationIds.size(); i++) {
|
||||||
|
stmt.setInt(1, operationIds.get(i));
|
||||||
|
stmt.addBatch();
|
||||||
|
|
||||||
|
if (++count % this.batchSize == 0) {
|
||||||
|
stmt.executeBatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stmt.executeBatch();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(count + " Records copied to the temporary table.");
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error while copying operation Ids for archival", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void moveOperationResponses() throws ArchivalDAOException {
|
||||||
|
Statement stmt = null;
|
||||||
|
PreparedStatement stmt2 = null;
|
||||||
|
Statement stmt3 = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
String sql = "SELECT * FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN " +
|
||||||
|
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt = this.createMemoryEfficientStatement(conn);
|
||||||
|
rs = stmt.executeQuery(sql);
|
||||||
|
|
||||||
|
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
sql = "INSERT INTO DM_DEVICE_OPERATION_RESPONSE_ARCH VALUES(?, ?, ?, ?, ?,?,?)";
|
||||||
|
stmt2 = conn2.prepareStatement(sql);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (rs.next()) {
|
||||||
|
stmt2.setInt(1, rs.getInt("ID"));
|
||||||
|
stmt2.setInt(2, rs.getInt("ENROLMENT_ID"));
|
||||||
|
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
|
||||||
|
stmt2.setInt(4, rs.getInt("EN_OP_MAP_ID"));
|
||||||
|
stmt2.setBytes(5, rs.getBytes("OPERATION_RESPONSE"));
|
||||||
|
stmt2.setTimestamp(6, rs.getTimestamp("RECEIVED_TIMESTAMP"));
|
||||||
|
stmt2.setTimestamp(7, this.currentTimestamp);
|
||||||
|
stmt2.addBatch();
|
||||||
|
|
||||||
|
if (++count % batchSize == 0) {
|
||||||
|
stmt2.executeBatch();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Executing batch " + count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stmt2.executeBatch();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(count + " [OPERATION_RESPONSES] Records copied to the archival table. Starting deletion");
|
||||||
|
}
|
||||||
|
//try the deletion now
|
||||||
|
sql = "DELETE FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN (" +
|
||||||
|
" SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt3 = conn.createStatement();
|
||||||
|
int affected = stmt3.executeUpdate(sql);
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(affected + " Rows deleted");
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while moving operations ", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void moveNotifications() throws ArchivalDAOException {
|
||||||
|
Statement stmt = null;
|
||||||
|
PreparedStatement stmt2 = null;
|
||||||
|
Statement stmt3 = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
String sql = "SELECT * FROM DM_NOTIFICATION WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt = this.createMemoryEfficientStatement(conn);
|
||||||
|
rs = stmt.executeQuery(sql);
|
||||||
|
|
||||||
|
// ArchivalDestinationDAOFactory.beginTransaction();
|
||||||
|
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
|
||||||
|
sql = "INSERT INTO DM_NOTIFICATION_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)";
|
||||||
|
stmt2 = conn2.prepareStatement(sql);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (rs.next()) {
|
||||||
|
stmt2.setInt(1, rs.getInt("NOTIFICATION_ID"));
|
||||||
|
stmt2.setInt(2, rs.getInt("DEVICE_ID"));
|
||||||
|
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
|
||||||
|
stmt2.setInt(4, rs.getInt("TENANT_ID"));
|
||||||
|
stmt2.setString(5, rs.getString("STATUS"));
|
||||||
|
stmt2.setString(6, rs.getString("DESCRIPTION"));
|
||||||
|
stmt2.setTimestamp(7, this.currentTimestamp);
|
||||||
|
stmt2.addBatch();
|
||||||
|
|
||||||
|
if (++count % batchSize == 0) {
|
||||||
|
stmt2.executeBatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stmt2.executeBatch();
|
||||||
|
// ArchivalDestinationDAOFactory.commitTransaction();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(count + " [NOTIFICATIONS] Records copied to the archival table. Starting deletion");
|
||||||
|
}
|
||||||
|
sql = "DELETE FROM DM_NOTIFICATION" +
|
||||||
|
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt3 = conn.createStatement();
|
||||||
|
int affected = stmt3.executeUpdate(sql);
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(affected + " Rows deleted");
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while moving notifications ", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void moveCommandOperations() throws ArchivalDAOException {
|
||||||
|
Statement stmt = null;
|
||||||
|
PreparedStatement stmt2 = null;
|
||||||
|
Statement stmt3 = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
String sql = "SELECT * FROM DM_COMMAND_OPERATION WHERE OPERATION_ID IN " +
|
||||||
|
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt = this.createMemoryEfficientStatement(conn);
|
||||||
|
rs = stmt.executeQuery(sql);
|
||||||
|
|
||||||
|
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
|
||||||
|
sql = "INSERT INTO DM_COMMAND_OPERATION_ARCH VALUES(?,?,?)";
|
||||||
|
stmt2 = conn2.prepareStatement(sql);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (rs.next()) {
|
||||||
|
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
|
||||||
|
stmt2.setInt(2, rs.getInt("ENABLED"));
|
||||||
|
stmt2.setTimestamp(3, this.currentTimestamp);
|
||||||
|
stmt2.addBatch();
|
||||||
|
|
||||||
|
if (++count % batchSize == 0) {
|
||||||
|
stmt2.executeBatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stmt2.executeBatch();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(count + " [COMMAND_OPERATION] Records copied to the archival table. Starting deletion");
|
||||||
|
}
|
||||||
|
sql = "DELETE FROM DM_COMMAND_OPERATION" +
|
||||||
|
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt3 = conn.createStatement();
|
||||||
|
int affected = stmt3.executeUpdate(sql);
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(affected + " Rows deleted");
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while moving command operations", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void moveProfileOperations() throws ArchivalDAOException {
|
||||||
|
Statement stmt = null;
|
||||||
|
PreparedStatement stmt2 = null;
|
||||||
|
Statement stmt3 = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
String sql = "SELECT * FROM DM_PROFILE_OPERATION WHERE OPERATION_ID IN " +
|
||||||
|
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt = this.createMemoryEfficientStatement(conn);
|
||||||
|
rs = stmt.executeQuery(sql);
|
||||||
|
|
||||||
|
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
|
||||||
|
sql = "INSERT INTO DM_PROFILE_OPERATION_ARCH VALUES(?, ?, ?, ?)";
|
||||||
|
stmt2 = conn2.prepareStatement(sql);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (rs.next()) {
|
||||||
|
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
|
||||||
|
stmt2.setInt(2, rs.getInt("ENABLED"));
|
||||||
|
stmt2.setBytes(3, rs.getBytes("OPERATION_DETAILS"));
|
||||||
|
stmt2.setTimestamp(4,this.currentTimestamp );
|
||||||
|
stmt2.addBatch();
|
||||||
|
|
||||||
|
if (++count % batchSize == 0) {
|
||||||
|
stmt2.executeBatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stmt2.executeBatch();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(count + " [PROFILE_OPERATION] Records copied to the archival table. Starting deletion");
|
||||||
|
}
|
||||||
|
sql = "DELETE FROM DM_PROFILE_OPERATION" +
|
||||||
|
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt3 = conn.createStatement();
|
||||||
|
int affected = stmt3.executeUpdate(sql);
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(affected + " Rows deleted");
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while moving profile operations", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void moveEnrolmentMappings() throws ArchivalDAOException {
|
||||||
|
Statement stmt = null;
|
||||||
|
PreparedStatement stmt2 = null;
|
||||||
|
Statement stmt3 = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
String sql = "SELECT * FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN " +
|
||||||
|
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt = this.createMemoryEfficientStatement(conn);
|
||||||
|
rs = stmt.executeQuery(sql);
|
||||||
|
|
||||||
|
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
|
||||||
|
sql = "INSERT INTO DM_ENROLMENT_OP_MAPPING_ARCH VALUES(?, ?, ?, ?, ?, ?, ?,?)";
|
||||||
|
stmt2 = conn2.prepareStatement(sql);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (rs.next()) {
|
||||||
|
stmt2.setInt(1, rs.getInt("ID"));
|
||||||
|
stmt2.setInt(2, rs.getInt("ENROLMENT_ID"));
|
||||||
|
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
|
||||||
|
stmt2.setString(4, rs.getString("STATUS"));
|
||||||
|
stmt2.setString(5, rs.getString("PUSH_NOTIFICATION_STATUS"));
|
||||||
|
stmt2.setInt(6, rs.getInt("CREATED_TIMESTAMP"));
|
||||||
|
stmt2.setInt(7, rs.getInt("UPDATED_TIMESTAMP"));
|
||||||
|
stmt2.setTimestamp(8, this.currentTimestamp);
|
||||||
|
stmt2.addBatch();
|
||||||
|
|
||||||
|
if (++count % batchSize == 0) {
|
||||||
|
stmt2.executeBatch();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Executing batch " + count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stmt2.executeBatch();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(count + " [ENROLMENT_OP_MAPPING] Records copied to the archival table. Starting deletion");
|
||||||
|
}
|
||||||
|
sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN (" +
|
||||||
|
"SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt3 = conn.createStatement();
|
||||||
|
int affected = stmt3.executeUpdate(sql);
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(affected + " Rows deleted");
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while moving enrolment mappings", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void moveOperations() throws ArchivalDAOException {
|
||||||
|
Statement stmt = null;
|
||||||
|
PreparedStatement stmt2 = null;
|
||||||
|
Statement stmt3 = null;
|
||||||
|
ResultSet rs = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
String sql = "SELECT * FROM DM_OPERATION WHERE ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt = this.createMemoryEfficientStatement(conn);
|
||||||
|
rs = stmt.executeQuery(sql);
|
||||||
|
|
||||||
|
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
sql = "INSERT INTO DM_OPERATION_ARCH VALUES(?, ?, ?, ?, ?, ?)";
|
||||||
|
stmt2 = conn2.prepareStatement(sql);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (rs.next()) {
|
||||||
|
stmt2.setInt(1, rs.getInt("ID"));
|
||||||
|
stmt2.setString(2, rs.getString("TYPE"));
|
||||||
|
stmt2.setTimestamp(3, rs.getTimestamp("CREATED_TIMESTAMP"));
|
||||||
|
stmt2.setTimestamp(4, rs.getTimestamp("RECEIVED_TIMESTAMP"));
|
||||||
|
stmt2.setString(5, rs.getString("OPERATION_CODE"));
|
||||||
|
stmt2.setTimestamp(6, this.currentTimestamp);
|
||||||
|
stmt2.addBatch();
|
||||||
|
|
||||||
|
if (++count % batchSize == 0) {
|
||||||
|
stmt2.executeBatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stmt2.executeBatch();
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(count + " [OPERATIONS] Records copied to the archival table. Starting deletion");
|
||||||
|
}
|
||||||
|
sql = "DELETE FROM DM_OPERATION WHERE ID IN (" +
|
||||||
|
"SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||||
|
stmt3 = conn.createStatement();
|
||||||
|
int affected = stmt3.executeUpdate(sql);
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(affected + " Rows deleted");
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while moving operations", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void truncateOperationIDsForArchival() throws ArchivalDAOException {
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
String sql = "TRUNCATE DM_ARCHIVED_OPERATIONS";
|
||||||
|
stmt = conn.prepareStatement(sql);
|
||||||
|
stmt.addBatch();
|
||||||
|
stmt.executeBatch();
|
||||||
|
conn.commit();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while truncating operation Ids", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Statement createMemoryEfficientStatement(Connection conn) throws ArchivalDAOException, SQLException {
|
||||||
|
Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||||
|
stmt.setFetchSize(Integer.MIN_VALUE);
|
||||||
|
return stmt;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,163 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.archival.dao.impl;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOException;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOUtil;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDestinationDAOFactory;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.dao.DataDeletionDAO;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
public class DataDeletionDAOImpl implements DataDeletionDAO {
|
||||||
|
private static Log log = LogFactory.getLog(DataDeletionDAOImpl.class);
|
||||||
|
|
||||||
|
private int retentionPeriod = DataDeletionDAO.DEFAULT_RETENTION_PERIOD;
|
||||||
|
|
||||||
|
public DataDeletionDAOImpl(int retentionPeriod) {
|
||||||
|
this.retentionPeriod = retentionPeriod;
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Using retention period as " + retentionPeriod);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteOperationResponses() throws ArchivalDAOException {
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
String sql = "DELETE FROM DM_DEVICE_OPERATION_RESPONSE_ARCH " +
|
||||||
|
"WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||||
|
stmt = conn.prepareStatement(sql);
|
||||||
|
stmt.setInt(1, this.retentionPeriod);
|
||||||
|
stmt.addBatch();
|
||||||
|
stmt.executeBatch();
|
||||||
|
conn.commit();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while deleting operation responses", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteNotifications() throws ArchivalDAOException {
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
String sql = "DELETE FROM DM_NOTIFICATION_ARCH" +
|
||||||
|
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||||
|
stmt = conn.prepareStatement(sql);
|
||||||
|
stmt.setInt(1, this.retentionPeriod);
|
||||||
|
stmt.addBatch();
|
||||||
|
stmt.executeBatch();
|
||||||
|
conn.commit();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while deleting notifications", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteCommandOperations() throws ArchivalDAOException {
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
String sql = "DELETE FROM DM_COMMAND_OPERATION_ARCH" +
|
||||||
|
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||||
|
stmt = conn.prepareStatement(sql);
|
||||||
|
stmt.setInt(1, this.retentionPeriod);
|
||||||
|
stmt.addBatch();
|
||||||
|
stmt.executeBatch();
|
||||||
|
conn.commit();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while deleting command operations", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteProfileOperations() throws ArchivalDAOException {
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
String sql = "DELETE FROM DM_PROFILE_OPERATION_ARCH" +
|
||||||
|
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||||
|
stmt = conn.prepareStatement(sql);
|
||||||
|
stmt.setInt(1, this.retentionPeriod);
|
||||||
|
stmt.addBatch();
|
||||||
|
stmt.executeBatch();
|
||||||
|
conn.commit();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while deleting profile operations", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteEnrolmentMappings() throws ArchivalDAOException {
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
String sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||||
|
stmt = conn.prepareStatement(sql);
|
||||||
|
stmt.setInt(1, this.retentionPeriod);
|
||||||
|
stmt.addBatch();
|
||||||
|
stmt.executeBatch();
|
||||||
|
conn.commit();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while deleting enrolment mappings", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteOperations() throws ArchivalDAOException {
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
try {
|
||||||
|
Connection conn = ArchivalDestinationDAOFactory.getConnection();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
|
String sql = "DELETE FROM DM_OPERATION_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||||
|
stmt = conn.prepareStatement(sql);
|
||||||
|
stmt.setInt(1, this.retentionPeriod);
|
||||||
|
stmt.addBatch();
|
||||||
|
stmt.executeBatch();
|
||||||
|
conn.commit();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new ArchivalDAOException("Error occurred while deleting operations", e);
|
||||||
|
} finally {
|
||||||
|
ArchivalDAOUtil.cleanupResources(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,53 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* you may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.config.archival;
|
||||||
|
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.datasource.DataSourceConfig;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class represents the information related to data archival configurations.
|
||||||
|
*/
|
||||||
|
@XmlRootElement(name = "ArchivalConfiguration")
|
||||||
|
public class ArchivalConfiguration {
|
||||||
|
|
||||||
|
private DataSourceConfig dataSourceConfig;
|
||||||
|
private ArchivalTaskConfiguration archivalTaskConfiguration;
|
||||||
|
|
||||||
|
@XmlElement(name = "DataSourceConfiguration", required = true)
|
||||||
|
public DataSourceConfig getDataSourceConfig() {
|
||||||
|
return dataSourceConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDataSourceConfig(DataSourceConfig dataSourceConfig) {
|
||||||
|
this.dataSourceConfig = dataSourceConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlElement(name = "ArchivalTask", required = true)
|
||||||
|
public ArchivalTaskConfiguration getArchivalTaskConfiguration() {
|
||||||
|
return archivalTaskConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setArchivalTaskConfiguration(ArchivalTaskConfiguration archivalTaskConfiguration) {
|
||||||
|
this.archivalTaskConfiguration = archivalTaskConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,86 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* you may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.config.archival;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
|
||||||
|
@XmlRootElement(name = "ArchivalTask")
|
||||||
|
public class ArchivalTaskConfiguration {
|
||||||
|
private boolean enabled;
|
||||||
|
private String cronExpression;
|
||||||
|
private String taskClazz;
|
||||||
|
private int retentionPeriod;
|
||||||
|
private int batchSize;
|
||||||
|
private PurgingTaskConfiguration purgingTaskConfiguration;
|
||||||
|
|
||||||
|
@XmlElement(name = "Enabled", required = true)
|
||||||
|
public boolean isEnabled() {
|
||||||
|
return enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnabled(boolean enabled) {
|
||||||
|
this.enabled = enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlElement(name = "CronExpression", required = true)
|
||||||
|
public String getCronExpression() {
|
||||||
|
return cronExpression;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCronExpression(String cronExpression) {
|
||||||
|
this.cronExpression = cronExpression;
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlElement(name = "TaskClass", required = true)
|
||||||
|
public String getTaskClazz() {
|
||||||
|
return taskClazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTaskClazz(String taskClazz) {
|
||||||
|
this.taskClazz = taskClazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlElement(name = "RetentionPeriod", required = true)
|
||||||
|
public int getRetentionPeriod() {
|
||||||
|
return retentionPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRetentionPeriod(int retentionPeriod) {
|
||||||
|
this.retentionPeriod = retentionPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlElement(name = "PurgingTask", required = true)
|
||||||
|
public PurgingTaskConfiguration getPurgingTaskConfiguration() {
|
||||||
|
return purgingTaskConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPurgingTaskConfiguration(PurgingTaskConfiguration purgingTaskConfiguration) {
|
||||||
|
this.purgingTaskConfiguration = purgingTaskConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlElement(name ="ExecutionBatchSize", required = true)
|
||||||
|
public int getBatchSize() {
|
||||||
|
return batchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBatchSize(int batchSize) {
|
||||||
|
this.batchSize = batchSize;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,66 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* you may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.config.archival;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
|
||||||
|
@XmlRootElement(name = "PurgingTask")
|
||||||
|
public class PurgingTaskConfiguration {
|
||||||
|
private boolean enabled;
|
||||||
|
private String cronExpression;
|
||||||
|
private String taskClazz;
|
||||||
|
private int retentionPeriod;
|
||||||
|
|
||||||
|
@XmlElement(name = "Enabled", required = true)
|
||||||
|
public boolean isEnabled() {
|
||||||
|
return enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnabled(boolean enabled) {
|
||||||
|
this.enabled = enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlElement(name = "CronExpression", required = true)
|
||||||
|
public String getCronExpression() {
|
||||||
|
return cronExpression;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCronExpression(String cronExpression) {
|
||||||
|
this.cronExpression = cronExpression;
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlElement(name = "TaskClass", required = true)
|
||||||
|
public String getTaskClazz() {
|
||||||
|
return taskClazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTaskClazz(String taskClazz) {
|
||||||
|
this.taskClazz = taskClazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlElement(name = "RetentionPeriod", required = true)
|
||||||
|
public int getRetentionPeriod() {
|
||||||
|
return retentionPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRetentionPeriod(int retentionPeriod) {
|
||||||
|
this.retentionPeriod = retentionPeriod;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,89 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.wso2.carbon.device.mgt.core.internal;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.osgi.service.component.ComponentContext;
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
|
||||||
|
import org.wso2.carbon.device.mgt.core.task.ArchivalTaskManager;
|
||||||
|
import org.wso2.carbon.device.mgt.core.task.impl.ArchivalTaskManagerImpl;
|
||||||
|
import org.wso2.carbon.ntask.core.service.TaskService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @scr.component name="org.wso2.carbon.activity.data.archival" immediate="true"
|
||||||
|
* @scr.reference name="device.ntask.component"
|
||||||
|
* interface="org.wso2.carbon.ntask.core.service.TaskService"
|
||||||
|
* cardinality="1..1"
|
||||||
|
* policy="dynamic"
|
||||||
|
* bind="setTaskService"
|
||||||
|
* unbind="unsetTaskService"
|
||||||
|
*/
|
||||||
|
public class ActivityDataPurgingServiceComponent {
|
||||||
|
private static Log log = LogFactory.getLog(ActivityDataPurgingServiceComponent.class);
|
||||||
|
|
||||||
|
protected void activate(ComponentContext componentContext) {
|
||||||
|
try {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Initializing activity data archival task manager bundle.");
|
||||||
|
}
|
||||||
|
ArchivalTaskManager archivalTaskManager = new ArchivalTaskManagerImpl();
|
||||||
|
|
||||||
|
// This will start the data archival task
|
||||||
|
boolean purgingTaskEnabled =
|
||||||
|
DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration()
|
||||||
|
.getArchivalTaskConfiguration().isEnabled();
|
||||||
|
|
||||||
|
if (purgingTaskEnabled) {
|
||||||
|
archivalTaskManager.scheduleArchivalTask();
|
||||||
|
} else {
|
||||||
|
log.warn("Data archival task has been disabled. It is recommended to enable archival task to prune the " +
|
||||||
|
"transactional databases tables time to time.");
|
||||||
|
}
|
||||||
|
// This will start the data deletion task.
|
||||||
|
boolean deletionTaskEnabled =
|
||||||
|
DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration()
|
||||||
|
.getArchivalTaskConfiguration().getPurgingTaskConfiguration().isEnabled();
|
||||||
|
if (deletionTaskEnabled) {
|
||||||
|
archivalTaskManager.scheduleDeletionTask();
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.error("Error occurred while initializing activity data archival task manager service.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setTaskService(TaskService taskService) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Setting the task service.");
|
||||||
|
}
|
||||||
|
DeviceManagementDataHolder.getInstance().setTaskService(taskService);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void unsetTaskService(TaskService taskService) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Removing the task service.");
|
||||||
|
}
|
||||||
|
DeviceManagementDataHolder.getInstance().setTaskService(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
protected void deactivate(ComponentContext componentContext) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,47 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.task;
|
||||||
|
|
||||||
|
public class ArchivalTaskException extends Exception {
|
||||||
|
private String errorMessage;
|
||||||
|
|
||||||
|
public String getErrorMessage() {
|
||||||
|
return errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setErrorMessage(String errorMessage) {
|
||||||
|
this.errorMessage = errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArchivalTaskException(String message) {
|
||||||
|
super(message);
|
||||||
|
setErrorMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArchivalTaskException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
setErrorMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArchivalTaskException(String message, Throwable cause, boolean enableSuppression,
|
||||||
|
boolean writableStackTrace, String errorMessage) {
|
||||||
|
super(message, cause, enableSuppression, writableStackTrace);
|
||||||
|
setErrorMessage(message);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,27 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.task;
|
||||||
|
|
||||||
|
public interface ArchivalTaskManager {
|
||||||
|
|
||||||
|
void scheduleArchivalTask() throws ArchivalTaskException;
|
||||||
|
|
||||||
|
void scheduleDeletionTask() throws ArchivalTaskException;
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,87 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.task.impl;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.ArchivalException;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.ArchivalService;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.ArchivalServiceImpl;
|
||||||
|
import org.wso2.carbon.ntask.core.Task;
|
||||||
|
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
|
||||||
|
public class ArchivalTask implements Task {
|
||||||
|
private static Log log = LogFactory.getLog(ArchivalTask.class);
|
||||||
|
|
||||||
|
private ArchivalService archivalService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setProperties(Map<String, String> map) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.archivalService = new ArchivalServiceImpl();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute() {
|
||||||
|
log.info("Executing ArchivalTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
archivalService.archiveTransactionalRecords();
|
||||||
|
} catch (ArchivalException e) {
|
||||||
|
log.error("An error occurred while running ArchivalTask", e);
|
||||||
|
}
|
||||||
|
long endTime = System.currentTimeMillis();
|
||||||
|
long difference = endTime - startTime;
|
||||||
|
log.info("ArchivalTask completed. Total execution time: " + getDurationBreakdown(difference));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getDurationBreakdown(long millis) {
|
||||||
|
if (millis < 0) {
|
||||||
|
throw new IllegalArgumentException("Duration must be greater than zero!");
|
||||||
|
}
|
||||||
|
long days = TimeUnit.MILLISECONDS.toDays(millis);
|
||||||
|
millis -= TimeUnit.DAYS.toMillis(days);
|
||||||
|
long hours = TimeUnit.MILLISECONDS.toHours(millis);
|
||||||
|
millis -= TimeUnit.HOURS.toMillis(hours);
|
||||||
|
long minutes = TimeUnit.MILLISECONDS.toMinutes(millis);
|
||||||
|
millis -= TimeUnit.MINUTES.toMillis(minutes);
|
||||||
|
long seconds = TimeUnit.MILLISECONDS.toSeconds(millis);
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder(64);
|
||||||
|
sb.append(days);
|
||||||
|
sb.append(" Days ");
|
||||||
|
sb.append(hours);
|
||||||
|
sb.append(" Hours ");
|
||||||
|
sb.append(minutes);
|
||||||
|
sb.append(" Minutes ");
|
||||||
|
sb.append(seconds);
|
||||||
|
sb.append(" Seconds");
|
||||||
|
|
||||||
|
return (sb.toString());
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,130 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.task.impl;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
||||||
|
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
|
||||||
|
import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder;
|
||||||
|
import org.wso2.carbon.device.mgt.core.task.ArchivalTaskException;
|
||||||
|
import org.wso2.carbon.device.mgt.core.task.ArchivalTaskManager;
|
||||||
|
import org.wso2.carbon.ntask.common.TaskException;
|
||||||
|
import org.wso2.carbon.ntask.core.TaskInfo;
|
||||||
|
import org.wso2.carbon.ntask.core.TaskManager;
|
||||||
|
import org.wso2.carbon.ntask.core.service.TaskService;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ArchivalTaskManagerImpl implements ArchivalTaskManager {
|
||||||
|
private static final String TASK_TYPE_ARCHIVAL = "DATA_ARCHIVAL";
|
||||||
|
private static final String TASK_TYPE_DELETION = "DATA_DELETION";
|
||||||
|
|
||||||
|
private static final String TASK_NAME_ARCHIVAL = "DATA_ARCHIVAL_TASK";
|
||||||
|
private static final String TASK_NAME_DELETION = "DATA_DELETION_TASK";
|
||||||
|
|
||||||
|
private static final String TENANT_ID = "TENANT_ID";
|
||||||
|
|
||||||
|
private static Log log = LogFactory.getLog(ArchivalTaskManagerImpl.class);
|
||||||
|
|
||||||
|
public void scheduleArchivalTask() throws ArchivalTaskException {
|
||||||
|
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
|
||||||
|
|
||||||
|
try {
|
||||||
|
TaskService taskService = DeviceManagementDataHolder.getInstance().getTaskService();
|
||||||
|
taskService.registerTaskType(TASK_TYPE_ARCHIVAL);
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Data archival task is started for the tenant id " + tenantId);
|
||||||
|
}
|
||||||
|
String taskClazz = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
|
||||||
|
.getArchivalConfiguration().getArchivalTaskConfiguration().getTaskClazz();
|
||||||
|
String cronExpression = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
|
||||||
|
.getArchivalConfiguration().getArchivalTaskConfiguration().getCronExpression();
|
||||||
|
|
||||||
|
TaskManager taskManager = taskService.getTaskManager(TASK_TYPE_ARCHIVAL);
|
||||||
|
|
||||||
|
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
|
||||||
|
triggerInfo.setCronExpression(cronExpression);
|
||||||
|
// triggerInfo.setIntervalMillis(60000);
|
||||||
|
triggerInfo.setRepeatCount(-1);
|
||||||
|
// triggerInfo.setRepeatCount(0);
|
||||||
|
triggerInfo.setDisallowConcurrentExecution(true);
|
||||||
|
|
||||||
|
Map<String, String> properties = new HashMap<>();
|
||||||
|
properties.put(TENANT_ID, String.valueOf(tenantId));
|
||||||
|
|
||||||
|
if (!taskManager.isTaskScheduled(TASK_NAME_ARCHIVAL)) {
|
||||||
|
TaskInfo taskInfo = new TaskInfo(TASK_NAME_ARCHIVAL, taskClazz, properties, triggerInfo);
|
||||||
|
taskManager.registerTask(taskInfo);
|
||||||
|
taskManager.rescheduleTask(taskInfo.getName());
|
||||||
|
} else {
|
||||||
|
throw new ArchivalTaskException("Data archival task is already started for this tenant " +
|
||||||
|
tenantId);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (TaskException e) {
|
||||||
|
throw new ArchivalTaskException("Error occurred while creating the task for tenant " + tenantId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void scheduleDeletionTask() throws ArchivalTaskException {
|
||||||
|
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
|
||||||
|
try {
|
||||||
|
TaskService taskService = DeviceManagementDataHolder.getInstance().getTaskService();
|
||||||
|
taskService.registerTaskType(TASK_TYPE_DELETION);
|
||||||
|
|
||||||
|
String taskClazz = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
|
||||||
|
.getArchivalConfiguration().getArchivalTaskConfiguration()
|
||||||
|
.getPurgingTaskConfiguration().getTaskClazz();
|
||||||
|
String cronExpression = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
|
||||||
|
.getArchivalConfiguration().getArchivalTaskConfiguration()
|
||||||
|
.getPurgingTaskConfiguration().getCronExpression();
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Data deletion task is started for the tenant id " + tenantId);
|
||||||
|
}
|
||||||
|
TaskManager taskManager = taskService.getTaskManager(TASK_TYPE_DELETION);
|
||||||
|
|
||||||
|
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
|
||||||
|
triggerInfo.setCronExpression(cronExpression);
|
||||||
|
triggerInfo.setRepeatCount(-1);
|
||||||
|
triggerInfo.setDisallowConcurrentExecution(true);
|
||||||
|
|
||||||
|
Map<String, String> properties = new HashMap<>();
|
||||||
|
properties.put(TENANT_ID, String.valueOf(tenantId));
|
||||||
|
|
||||||
|
if (!taskManager.isTaskScheduled(TASK_NAME_DELETION)) {
|
||||||
|
TaskInfo taskInfo = new TaskInfo(TASK_NAME_DELETION, taskClazz, properties, triggerInfo);
|
||||||
|
taskManager.registerTask(taskInfo);
|
||||||
|
taskManager.rescheduleTask(taskInfo.getName());
|
||||||
|
} else {
|
||||||
|
throw new ArchivalTaskException("Data deletion task is already started for this tenant " +
|
||||||
|
tenantId);
|
||||||
|
}
|
||||||
|
} catch (TaskException e) {
|
||||||
|
throw new ArchivalTaskException("Error occurred while creating the task for tenant " + tenantId, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,61 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||||
|
*
|
||||||
|
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||||
|
* Version 2.0 (the "License"); you may not use this file except
|
||||||
|
* in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.wso2.carbon.device.mgt.core.task.impl;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.ArchivalException;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.ArchivalService;
|
||||||
|
import org.wso2.carbon.device.mgt.core.archival.ArchivalServiceImpl;
|
||||||
|
import org.wso2.carbon.ntask.core.Task;
|
||||||
|
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ArchivedDataDeletionTask implements Task {
|
||||||
|
|
||||||
|
private static Log log = LogFactory.getLog(ArchivedDataDeletionTask.class);
|
||||||
|
|
||||||
|
private ArchivalService archivalService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setProperties(Map<String, String> map) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.archivalService = new ArchivalServiceImpl();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute() {
|
||||||
|
log.info("Executing DataDeletionTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
|
||||||
|
long startTime = System.nanoTime();
|
||||||
|
try {
|
||||||
|
archivalService.deleteArchivedRecords();
|
||||||
|
} catch (ArchivalException e) {
|
||||||
|
log.error("An error occurred while executing DataDeletionTask", e);
|
||||||
|
}
|
||||||
|
long endTime = System.nanoTime();
|
||||||
|
long difference = (endTime - startTime) / 1000000 * 1000;
|
||||||
|
log.info("DataDeletionTask completed. Total execution time: " + difference + " seconds");
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,61 @@
|
|||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS DM_OPERATION_ARCH (
|
||||||
|
ID INTEGER NOT NULL,
|
||||||
|
TYPE VARCHAR(20) NOT NULL,
|
||||||
|
CREATED_TIMESTAMP TIMESTAMP NOT NULL,
|
||||||
|
RECEIVED_TIMESTAMP TIMESTAMP NULL,
|
||||||
|
OPERATION_CODE VARCHAR(50) NOT NULL,
|
||||||
|
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
|
||||||
|
PRIMARY KEY (ID)
|
||||||
|
)ENGINE = InnoDB;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS DM_ENROLMENT_OP_MAPPING_ARCH (
|
||||||
|
ID INTEGER NOT NULL,
|
||||||
|
ENROLMENT_ID INTEGER NOT NULL,
|
||||||
|
OPERATION_ID INTEGER NOT NULL,
|
||||||
|
STATUS VARCHAR(50) NULL,
|
||||||
|
PUSH_NOTIFICATION_STATUS VARCHAR(50) NULL,
|
||||||
|
CREATED_TIMESTAMP INTEGER NOT NULL,
|
||||||
|
UPDATED_TIMESTAMP INTEGER NOT NULL,
|
||||||
|
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
|
||||||
|
PRIMARY KEY (ID)
|
||||||
|
)ENGINE = InnoDB;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS DM_DEVICE_OPERATION_RESPONSE_ARCH (
|
||||||
|
ID INT(11) NOT NULL,
|
||||||
|
ENROLMENT_ID INTEGER NOT NULL,
|
||||||
|
OPERATION_ID INTEGER NOT NULL,
|
||||||
|
EN_OP_MAP_ID INTEGER NOT NULL,
|
||||||
|
OPERATION_RESPONSE LONGBLOB DEFAULT NULL,
|
||||||
|
RECEIVED_TIMESTAMP TIMESTAMP NULL,
|
||||||
|
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
|
||||||
|
PRIMARY KEY (ID)
|
||||||
|
)ENGINE = InnoDB;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS DM_NOTIFICATION_ARCH (
|
||||||
|
NOTIFICATION_ID INTEGER NOT NULL,
|
||||||
|
DEVICE_ID INTEGER NOT NULL,
|
||||||
|
OPERATION_ID INTEGER NOT NULL,
|
||||||
|
TENANT_ID INTEGER NOT NULL,
|
||||||
|
STATUS VARCHAR(10) NULL,
|
||||||
|
DESCRIPTION VARCHAR(1000) NULL,
|
||||||
|
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
|
||||||
|
PRIMARY KEY (NOTIFICATION_ID)
|
||||||
|
)ENGINE = InnoDB;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS DM_COMMAND_OPERATION_ARCH (
|
||||||
|
OPERATION_ID INTEGER NOT NULL,
|
||||||
|
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
|
||||||
|
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
|
||||||
|
PRIMARY KEY (OPERATION_ID)
|
||||||
|
)ENGINE = InnoDB;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS DM_PROFILE_OPERATION_ARCH (
|
||||||
|
OPERATION_ID INTEGER NOT NULL,
|
||||||
|
ENABLED INTEGER NOT NULL DEFAULT 0,
|
||||||
|
OPERATION_DETAILS BLOB DEFAULT NULL,
|
||||||
|
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
|
||||||
|
PRIMARY KEY (OPERATION_ID)
|
||||||
|
)ENGINE = InnoDB;
|
Loading…
Reference in new issue