Merge pull request #977 from dunithd/master

Data archival task implementation to prune CDM transnational tables.
revert-70aa11f8
Charitha Goonetilleke 7 years ago committed by GitHub
commit c281374d95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,49 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival;
public class ArchivalException extends Exception {
private String errorMessage;
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public ArchivalException(String message) {
super(message);
setErrorMessage(message);
}
public ArchivalException(String message, Throwable cause) {
super(message, cause);
setErrorMessage(message);
}
protected ArchivalException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
setErrorMessage(message);
}
}

@ -0,0 +1,26 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival;
public interface ArchivalService {
void archiveTransactionalRecords() throws ArchivalException;
void deleteArchivedRecords() throws ArchivalException;
}

@ -0,0 +1,220 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.TransactionManagementException;
import org.wso2.carbon.device.mgt.core.archival.dao.*;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ArchivalServiceImpl implements ArchivalService {
private static Log log = LogFactory.getLog(ArchivalServiceImpl.class);
private ArchivalDAO archivalDAO;
private DataDeletionDAO dataDeletionDAO;
private static int ITERATION_COUNT = 10000;
private String[] NOT_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"};
private String[] NOT_PENDING_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED", "IN_PROGRESS"};
private String[] NOT_PENDING_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"};
public ArchivalServiceImpl() {
this.archivalDAO = ArchivalSourceDAOFactory.getDataPurgingDAO();
this.dataDeletionDAO = ArchivalDestinationDAOFactory.getDataDeletionDAO();
}
@Override
public void archiveTransactionalRecords() throws ArchivalException {
try {
ArchivalSourceDAOFactory.openConnection();
ArchivalDestinationDAOFactory.openConnection();
List<Integer> allOperations = archivalDAO.getAllOperations();
List<Integer> pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations();
log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() +
" P&IP Operations");
//Get the diff of operations
Set<Integer> setA = new HashSet<>(allOperations);
Set<Integer> setB = new HashSet<>(pendingAndIPOperations);
setA.removeAll(setB);
List<Integer> candidates = new ArrayList<>();
candidates.addAll(setA);
int total = candidates.size();
int batches = calculateNumberOfBatches(total);
int batchSize = ITERATION_COUNT;
if (log.isDebugEnabled()) {
log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done.");
}
beginTransactions();
for (int i = 1; i <= batches; i++) {
int startIdx = batchSize * (i - 1);
int endIdx = batchSize * i;
if (i == batches) {
endIdx = startIdx + (total % batchSize);
}
if(log.isDebugEnabled()) {
log.debug("\n\n############ Iterating over batch " + i + "[" +
startIdx + "," + endIdx + "] #######");
}
List<Integer> subList = candidates.subList(startIdx, endIdx);
prepareTempTable(subList);
//Purge the largest table, DM_DEVICE_OPERATION_RESPONSE
if (log.isDebugEnabled()) {
log.debug("## Purging operation responses");
}
archivalDAO.moveOperationResponses();
//Purge the notifications table, DM_NOTIFICATION
if (log.isDebugEnabled()) {
log.debug("## Purging notifications");
}
archivalDAO.moveNotifications();
//Purge the command operations table, DM_COMMAND_OPERATION
if (log.isDebugEnabled()) {
log.debug("## Purging command operations");
}
archivalDAO.moveCommandOperations();
//Purge the profile operation table, DM_PROFILE_OPERATION
if (log.isDebugEnabled()) {
log.debug("## Purging profile operations");
}
archivalDAO.moveProfileOperations();
//Purge the enrolment mappings table, DM_ENROLMENT_OP_MAPPING
if (log.isDebugEnabled()) {
log.debug("## Purging enrolment mappings");
}
archivalDAO.moveEnrolmentMappings();
//Finally, purge the operations table, DM_OPERATION
if (log.isDebugEnabled()) {
log.debug("## Purging operations");
}
archivalDAO.moveOperations();
}
commitTransactions();
} catch (ArchivalDAOException e) {
rollbackTransactions();
throw new ArchivalException("An error occurred while data archival", e);
} catch (SQLException e) {
throw new ArchivalException("An error occurred while connecting to the archival database.", e);
} finally {
ArchivalSourceDAOFactory.closeConnection();
ArchivalDestinationDAOFactory.closeConnection();
}
}
private void prepareTempTable(List<Integer> subList) throws ArchivalDAOException {
//Clean up the DM_ARCHIVED_OPERATIONS table
if (log.isDebugEnabled()) {
log.debug("## Truncating the temporary table");
}
archivalDAO.truncateOperationIDsForArchival();
archivalDAO.copyOperationIDsForArchival(subList);
}
private void beginTransactions() throws ArchivalException {
try {
ArchivalSourceDAOFactory.beginTransaction();
ArchivalDestinationDAOFactory.beginTransaction();
} catch (TransactionManagementException e) {
log.error("An error occurred during starting transactions", e);
throw new ArchivalException("An error occurred during starting transactions", e);
}
}
private void commitTransactions() {
ArchivalSourceDAOFactory.commitTransaction();
ArchivalDestinationDAOFactory.commitTransaction();
}
private void rollbackTransactions() {
ArchivalSourceDAOFactory.rollbackTransaction();
ArchivalDestinationDAOFactory.rollbackTransaction();
}
private int calculateNumberOfBatches(int total) {
int batches = 0;
int batchSize = ITERATION_COUNT;
if ((total % batchSize) > 0) {
batches = (total / batchSize) + 1;
} else {
batches = total / batchSize;
}
return batches;
}
@Override
public void deleteArchivedRecords() throws ArchivalException {
try {
ArchivalDestinationDAOFactory.openConnection();
if (log.isDebugEnabled()) {
log.debug("## Deleting operation responses");
}
dataDeletionDAO.deleteOperationResponses();
if (log.isDebugEnabled()) {
log.debug("## Deleting notifications ");
}
dataDeletionDAO.deleteNotifications();
if (log.isDebugEnabled()) {
log.debug("## Deleting command operations");
}
dataDeletionDAO.deleteCommandOperations();
if (log.isDebugEnabled()) {
log.debug("## Deleting profile operations ");
}
dataDeletionDAO.deleteProfileOperations();
if (log.isDebugEnabled()) {
log.debug("## Deleting enrolment mappings ");
}
dataDeletionDAO.deleteEnrolmentMappings();
if (log.isDebugEnabled()) {
log.debug("## Deleting operations ");
}
dataDeletionDAO.deleteOperations();
} catch (SQLException e) {
throw new ArchivalException("An error occurred while initialising data source for archival", e);
} catch (ArchivalDAOException e) {
log.error("An error occurred while executing DataDeletionTask");
} finally {
ArchivalDestinationDAOFactory.closeConnection();
}
}
}

@ -0,0 +1,50 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.dao;
import java.util.List;
/**
* Operations to move data from DM database to archival database
*/
public interface ArchivalDAO {
int DEFAULT_BATCH_SIZE = 1000;
List<Integer> getAllOperations() throws ArchivalDAOException;
List<Integer> getPendingAndInProgressOperations() throws ArchivalDAOException;
void copyOperationIDsForArchival(List<Integer> operationIds) throws ArchivalDAOException;
void moveOperationResponses() throws ArchivalDAOException;
void moveNotifications() throws ArchivalDAOException;
void moveCommandOperations() throws ArchivalDAOException;
void moveProfileOperations() throws ArchivalDAOException;
void moveEnrolmentMappings() throws ArchivalDAOException;
void moveOperations() throws ArchivalDAOException;
void truncateOperationIDsForArchival() throws ArchivalDAOException;
}

@ -0,0 +1,50 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.core.archival.dao;
public class ArchivalDAOException extends Exception {
private String errorMessage;
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public ArchivalDAOException(String message) {
super(message);
setErrorMessage(message);
}
public ArchivalDAOException(String message, Throwable cause) {
super(message, cause);
setErrorMessage(message);
}
protected ArchivalDAOException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
setErrorMessage(message);
}
}

@ -0,0 +1,60 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.core.archival.dao;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class ArchivalDAOUtil {
private static final Log log = LogFactory.getLog(ArchivalDAOUtil.class);
public static void cleanupResources(Statement stmt, ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
log.warn("Error occurred while closing the result set", e);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
log.warn("Error occurred while closing the statement", e);
}
}
}
public static void cleanupResources(Statement stmt) {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
log.warn("Error occurred while closing the statement", e);
}
}
}
}

@ -0,0 +1,167 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.dao;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.IllegalTransactionStateException;
import org.wso2.carbon.device.mgt.common.TransactionManagementException;
import org.wso2.carbon.device.mgt.core.archival.dao.impl.DataDeletionDAOImpl;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import org.wso2.carbon.device.mgt.core.config.datasource.DataSourceConfig;
import org.wso2.carbon.device.mgt.core.config.datasource.JNDILookupDefinition;
import org.wso2.carbon.device.mgt.core.dao.util.DeviceManagementDAOUtil;
import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Hashtable;
import java.util.List;
public class ArchivalDestinationDAOFactory {
private static final Log log = LogFactory.getLog(OperationManagementDAOFactory.class);
private static DataSource dataSource;
private static String databaseEngine;
private static int retentionPeriod;
private static ThreadLocal<Connection> currentConnection = new ThreadLocal<Connection>();
public static DataDeletionDAO getDataDeletionDAO() {
return new DataDeletionDAOImpl(DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
.getArchivalConfiguration().getArchivalTaskConfiguration()
.getPurgingTaskConfiguration().getRetentionPeriod());
}
public static void init(DataSource dtSource) {
dataSource = dtSource;
try {
databaseEngine = dataSource.getConnection().getMetaData().getDatabaseProductName();
} catch (SQLException e) {
log.error("Error occurred while retrieving config.datasource connection", e);
}
}
public static void init(DataSourceConfig config) {
dataSource = resolveDataSource(config);
try {
databaseEngine = dataSource.getConnection().getMetaData().getDatabaseProductName();
} catch (SQLException e) {
log.error("Error occurred while retrieving config.datasource connection", e);
}
}
public static void beginTransaction() throws TransactionManagementException {
try {
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
currentConnection.set(conn);
} catch (SQLException e) {
throw new TransactionManagementException(
"Error occurred while retrieving config.datasource connection", e);
}
}
public static void openConnection() throws SQLException {
currentConnection.set(dataSource.getConnection());
}
public static Connection getConnection() throws SQLException {
if (currentConnection.get() == null) {
throw new IllegalTransactionStateException("No connection is associated with the current transaction. " +
"This might have ideally caused by not properly initiating the transaction via " +
"'beginTransaction'/'openConnection' methods");
}
return currentConnection.get();
}
public static void closeConnection() {
Connection con = currentConnection.get();
if (con != null) {
try {
con.close();
} catch (SQLException e) {
log.error("Error occurred while close the connection");
}
currentConnection.remove();
}
}
public static void commitTransaction() {
try {
Connection conn = currentConnection.get();
if (conn != null) {
conn.commit();
} else {
if (log.isDebugEnabled()) {
log.debug("Datasource connection associated with the current thread is null, hence commit " +
"has not been attempted");
}
}
} catch (SQLException e) {
log.error("Error occurred while committing the transaction", e);
}
}
public static void rollbackTransaction() {
try {
Connection conn = currentConnection.get();
if (conn != null) {
conn.rollback();
} else {
if (log.isDebugEnabled()) {
log.debug("Datasource connection associated with the current thread is null, hence rollback " +
"has not been attempted");
}
}
} catch (SQLException e) {
log.error("Error occurred while roll-backing the transaction", e);
}
}
/**
* Resolve data source from the data source definition
*
* @param config data source configuration
* @return data source resolved from the data source definition
*/
private static DataSource resolveDataSource(DataSourceConfig config) {
DataSource dataSource = null;
if (config == null) {
throw new RuntimeException("Device Management Repository data source configuration is null and " +
"thus, is not initialized");
}
JNDILookupDefinition jndiConfig = config.getJndiLookupDefinition();
if (jndiConfig != null) {
if (log.isDebugEnabled()) {
log.debug("Initializing Device Management Repository data source using the JNDI Lookup Definition");
}
List<JNDILookupDefinition.JNDIProperty> jndiPropertyList = jndiConfig.getJndiProperties();
if (jndiPropertyList != null) {
Hashtable<Object, Object> jndiProperties = new Hashtable<Object, Object>();
for (JNDILookupDefinition.JNDIProperty prop : jndiPropertyList) {
jndiProperties.put(prop.getName(), prop.getValue());
}
dataSource = DeviceManagementDAOUtil.lookupDataSource(jndiConfig.getJndiName(), jndiProperties);
} else {
dataSource = DeviceManagementDAOUtil.lookupDataSource(jndiConfig.getJndiName(), null);
}
}
return dataSource;
}
}

@ -0,0 +1,170 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.dao;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.IllegalTransactionStateException;
import org.wso2.carbon.device.mgt.common.TransactionManagementException;
import org.wso2.carbon.device.mgt.core.archival.dao.impl.ArchivalDAOImpl;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import org.wso2.carbon.device.mgt.core.config.archival.ArchivalTaskConfiguration;
import org.wso2.carbon.device.mgt.core.config.datasource.DataSourceConfig;
import org.wso2.carbon.device.mgt.core.config.datasource.JNDILookupDefinition;
import org.wso2.carbon.device.mgt.core.dao.util.DeviceManagementDAOUtil;
import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Hashtable;
import java.util.List;
public class ArchivalSourceDAOFactory {
private static final Log log = LogFactory.getLog(OperationManagementDAOFactory.class);
private static DataSource dataSource;
private static String databaseEngine;
private static ThreadLocal<Connection> currentConnection = new ThreadLocal<Connection>();
public static ArchivalDAO getDataPurgingDAO() {
ArchivalTaskConfiguration configuration = DeviceConfigurationManager.getInstance()
.getDeviceManagementConfig()
.getArchivalConfiguration()
.getArchivalTaskConfiguration();
return new ArchivalDAOImpl(configuration.getRetentionPeriod(), configuration.getBatchSize());
}
public static void init(DataSource dtSource) {
dataSource = dtSource;
try {
databaseEngine = dataSource.getConnection().getMetaData().getDatabaseProductName();
} catch (SQLException e) {
log.error("Error occurred while retrieving config.datasource connection", e);
}
}
public static void init(DataSourceConfig config) {
dataSource = resolveDataSource(config);
try {
databaseEngine = dataSource.getConnection().getMetaData().getDatabaseProductName();
} catch (SQLException e) {
log.error("Error occurred while retrieving config.datasource connection", e);
}
}
public static void beginTransaction() throws TransactionManagementException {
try {
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
currentConnection.set(conn);
} catch (SQLException e) {
throw new TransactionManagementException(
"Error occurred while retrieving config.datasource connection", e);
}
}
public static void openConnection() throws SQLException {
currentConnection.set(dataSource.getConnection());
}
public static Connection getConnection() throws SQLException {
if (currentConnection.get() == null) {
throw new IllegalTransactionStateException("No connection is associated with the current transaction. " +
"This might have ideally caused by not properly initiating the transaction via " +
"'beginTransaction'/'openConnection' methods");
}
return currentConnection.get();
}
public static void closeConnection() {
Connection con = currentConnection.get();
if (con != null) {
try {
con.close();
} catch (SQLException e) {
log.error("Error occurred while close the connection");
}
currentConnection.remove();
}
}
public static void commitTransaction() {
try {
Connection conn = currentConnection.get();
if (conn != null) {
conn.commit();
} else {
if (log.isDebugEnabled()) {
log.debug("Datasource connection associated with the current thread is null, hence commit " +
"has not been attempted");
}
}
} catch (SQLException e) {
log.error("Error occurred while committing the transaction", e);
}
}
public static void rollbackTransaction() {
try {
Connection conn = currentConnection.get();
if (conn != null) {
conn.rollback();
} else {
if (log.isDebugEnabled()) {
log.debug("Datasource connection associated with the current thread is null, hence rollback " +
"has not been attempted");
}
}
} catch (SQLException e) {
log.error("Error occurred while roll-backing the transaction", e);
}
}
/**
* Resolve data source from the data source definition
*
* @param config data source configuration
* @return data source resolved from the data source definition
*/
private static DataSource resolveDataSource(DataSourceConfig config) {
DataSource dataSource = null;
if (config == null) {
throw new RuntimeException("Device Management Repository data source configuration is null and " +
"thus, is not initialized");
}
JNDILookupDefinition jndiConfig = config.getJndiLookupDefinition();
if (jndiConfig != null) {
if (log.isDebugEnabled()) {
log.debug("Initializing Device Management Repository data source using the JNDI Lookup Definition");
}
List<JNDILookupDefinition.JNDIProperty> jndiPropertyList = jndiConfig.getJndiProperties();
if (jndiPropertyList != null) {
Hashtable<Object, Object> jndiProperties = new Hashtable<Object, Object>();
for (JNDILookupDefinition.JNDIProperty prop : jndiPropertyList) {
jndiProperties.put(prop.getName(), prop.getValue());
}
dataSource = DeviceManagementDAOUtil.lookupDataSource(jndiConfig.getJndiName(), jndiProperties);
} else {
dataSource = DeviceManagementDAOUtil.lookupDataSource(jndiConfig.getJndiName(), null);
}
}
return dataSource;
}
}

@ -0,0 +1,39 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.dao;
/**
* Operations to permanently delete data from archived database tables
*/
public interface DataDeletionDAO {
int DEFAULT_RETENTION_PERIOD = 364;
void deleteOperationResponses() throws ArchivalDAOException;
void deleteNotifications() throws ArchivalDAOException;
void deleteCommandOperations() throws ArchivalDAOException;
void deleteProfileOperations() throws ArchivalDAOException;
void deleteEnrolmentMappings() throws ArchivalDAOException;
void deleteOperations() throws ArchivalDAOException;
}

@ -0,0 +1,471 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.dao.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.core.archival.dao.*;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class ArchivalDAOImpl implements ArchivalDAO {
private static final Log log = LogFactory.getLog(ArchivalDAOImpl.class);
private int retentionPeriod;
private int batchSize = ArchivalDAO.DEFAULT_BATCH_SIZE;
private Timestamp currentTimestamp;
public ArchivalDAOImpl(int retentionPeriod) {
this.retentionPeriod = retentionPeriod;
}
public ArchivalDAOImpl(int retentionPeriod, int batchSize) {
this.retentionPeriod = retentionPeriod;
this.batchSize = batchSize;
this.currentTimestamp = new Timestamp(new java.util.Date().getTime());
if (log.isDebugEnabled()) {
log.debug("Using batch size of " + this.batchSize + " with retention period " + this.retentionPeriod);
}
}
@Override
public List<Integer> getAllOperations() throws ArchivalDAOException {
List<Integer> operationIds = new ArrayList<>();
Statement stmt = null;
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT DISTINCT OPERATION_ID FROM DM_ENROLMENT_OP_MAPPING " +
"WHERE CREATED_TIMESTAMP BETWEEN DATE_SUB(NOW(), INTERVAL " +
this.retentionPeriod + " DAY) AND NOW()";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
while (rs.next()) {
operationIds.add(rs.getInt("OPERATION_ID"));
}
} catch (SQLException e) {
throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
}
if (log.isDebugEnabled()) {
log.debug(operationIds.size() + " operations found for the archival");
}
return operationIds;
}
@Override
public List<Integer> getPendingAndInProgressOperations() throws ArchivalDAOException {
List<Integer> operationIds = new ArrayList<>();
Statement stmt = null;
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT DISTINCT OPERATION_ID " +
" FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS IN('PENDING', 'IN_PROGRESS') " +
" AND CREATED_TIMESTAMP BETWEEN DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod +" DAY) AND NOW()";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
while (rs.next()) {
operationIds.add(rs.getInt("OPERATION_ID"));
}
} catch (SQLException e) {
throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
}
if (log.isDebugEnabled()) {
log.debug(operationIds.size() + " PENDING and IN_PROFRESS operations found for the archival");
}
return operationIds;
}
@Override
public void copyOperationIDsForArchival(List<Integer> operationIds) throws ArchivalDAOException {
PreparedStatement stmt = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "INSERT INTO DM_ARCHIVED_OPERATIONS(ID,CREATED_TIMESTAMP) VALUES (?,NOW())";
stmt = conn.prepareStatement(sql);
int count = 0;
for (int i = 0; i < operationIds.size(); i++) {
stmt.setInt(1, operationIds.get(i));
stmt.addBatch();
if (++count % this.batchSize == 0) {
stmt.executeBatch();
}
}
stmt.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " Records copied to the temporary table.");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error while copying operation Ids for archival", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
}
@Override
public void moveOperationResponses() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN " +
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_DEVICE_OPERATION_RESPONSE_ARCH VALUES(?, ?, ?, ?, ?,?,?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("ID"));
stmt2.setInt(2, rs.getInt("ENROLMENT_ID"));
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
stmt2.setInt(4, rs.getInt("EN_OP_MAP_ID"));
stmt2.setBytes(5, rs.getBytes("OPERATION_RESPONSE"));
stmt2.setTimestamp(6, rs.getTimestamp("RECEIVED_TIMESTAMP"));
stmt2.setTimestamp(7, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug("Executing batch " + count);
}
}
}
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " [OPERATION_RESPONSES] Records copied to the archival table. Starting deletion");
}
//try the deletion now
sql = "DELETE FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN (" +
" SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving operations ", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveNotifications() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_NOTIFICATION WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
// ArchivalDestinationDAOFactory.beginTransaction();
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_NOTIFICATION_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("NOTIFICATION_ID"));
stmt2.setInt(2, rs.getInt("DEVICE_ID"));
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
stmt2.setInt(4, rs.getInt("TENANT_ID"));
stmt2.setString(5, rs.getString("STATUS"));
stmt2.setString(6, rs.getString("DESCRIPTION"));
stmt2.setTimestamp(7, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
}
}
stmt2.executeBatch();
// ArchivalDestinationDAOFactory.commitTransaction();
if (log.isDebugEnabled()) {
log.debug(count + " [NOTIFICATIONS] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_NOTIFICATION" +
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving notifications ", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveCommandOperations() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_COMMAND_OPERATION WHERE OPERATION_ID IN " +
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_COMMAND_OPERATION_ARCH VALUES(?,?,?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
stmt2.setInt(2, rs.getInt("ENABLED"));
stmt2.setTimestamp(3, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
}
}
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " [COMMAND_OPERATION] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_COMMAND_OPERATION" +
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving command operations", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveProfileOperations() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_PROFILE_OPERATION WHERE OPERATION_ID IN " +
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_PROFILE_OPERATION_ARCH VALUES(?, ?, ?, ?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
stmt2.setInt(2, rs.getInt("ENABLED"));
stmt2.setBytes(3, rs.getBytes("OPERATION_DETAILS"));
stmt2.setTimestamp(4,this.currentTimestamp );
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
}
}
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " [PROFILE_OPERATION] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_PROFILE_OPERATION" +
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving profile operations", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveEnrolmentMappings() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN " +
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_ENROLMENT_OP_MAPPING_ARCH VALUES(?, ?, ?, ?, ?, ?, ?,?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("ID"));
stmt2.setInt(2, rs.getInt("ENROLMENT_ID"));
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
stmt2.setString(4, rs.getString("STATUS"));
stmt2.setString(5, rs.getString("PUSH_NOTIFICATION_STATUS"));
stmt2.setInt(6, rs.getInt("CREATED_TIMESTAMP"));
stmt2.setInt(7, rs.getInt("UPDATED_TIMESTAMP"));
stmt2.setTimestamp(8, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug("Executing batch " + count);
}
}
}
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " [ENROLMENT_OP_MAPPING] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN (" +
"SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving enrolment mappings", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void moveOperations() throws ArchivalDAOException {
Statement stmt = null;
PreparedStatement stmt2 = null;
Statement stmt3 = null;
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT * FROM DM_OPERATION WHERE ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
sql = "INSERT INTO DM_OPERATION_ARCH VALUES(?, ?, ?, ?, ?, ?)";
stmt2 = conn2.prepareStatement(sql);
int count = 0;
while (rs.next()) {
stmt2.setInt(1, rs.getInt("ID"));
stmt2.setString(2, rs.getString("TYPE"));
stmt2.setTimestamp(3, rs.getTimestamp("CREATED_TIMESTAMP"));
stmt2.setTimestamp(4, rs.getTimestamp("RECEIVED_TIMESTAMP"));
stmt2.setString(5, rs.getString("OPERATION_CODE"));
stmt2.setTimestamp(6, this.currentTimestamp);
stmt2.addBatch();
if (++count % batchSize == 0) {
stmt2.executeBatch();
}
}
stmt2.executeBatch();
if (log.isDebugEnabled()) {
log.debug(count + " [OPERATIONS] Records copied to the archival table. Starting deletion");
}
sql = "DELETE FROM DM_OPERATION WHERE ID IN (" +
"SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
stmt3 = conn.createStatement();
int affected = stmt3.executeUpdate(sql);
if (log.isDebugEnabled()) {
log.debug(affected + " Rows deleted");
}
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while moving operations", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt, rs);
ArchivalDAOUtil.cleanupResources(stmt2);
ArchivalDAOUtil.cleanupResources(stmt3);
}
}
@Override
public void truncateOperationIDsForArchival() throws ArchivalDAOException {
PreparedStatement stmt = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
conn.setAutoCommit(false);
String sql = "TRUNCATE DM_ARCHIVED_OPERATIONS";
stmt = conn.prepareStatement(sql);
stmt.addBatch();
stmt.executeBatch();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while truncating operation Ids", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
}
private Statement createMemoryEfficientStatement(Connection conn) throws ArchivalDAOException, SQLException {
Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(Integer.MIN_VALUE);
return stmt;
}
}

@ -0,0 +1,163 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.archival.dao.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOException;
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOUtil;
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDestinationDAOFactory;
import org.wso2.carbon.device.mgt.core.archival.dao.DataDeletionDAO;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class DataDeletionDAOImpl implements DataDeletionDAO {
private static Log log = LogFactory.getLog(DataDeletionDAOImpl.class);
private int retentionPeriod = DataDeletionDAO.DEFAULT_RETENTION_PERIOD;
public DataDeletionDAOImpl(int retentionPeriod) {
this.retentionPeriod = retentionPeriod;
if (log.isDebugEnabled()) {
log.debug("Using retention period as " + retentionPeriod);
}
}
@Override
public void deleteOperationResponses() throws ArchivalDAOException {
PreparedStatement stmt = null;
try {
Connection conn = ArchivalDestinationDAOFactory.getConnection();
conn.setAutoCommit(false);
String sql = "DELETE FROM DM_DEVICE_OPERATION_RESPONSE_ARCH " +
"WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting operation responses", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
}
@Override
public void deleteNotifications() throws ArchivalDAOException {
PreparedStatement stmt = null;
try {
Connection conn = ArchivalDestinationDAOFactory.getConnection();
conn.setAutoCommit(false);
String sql = "DELETE FROM DM_NOTIFICATION_ARCH" +
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting notifications", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
}
@Override
public void deleteCommandOperations() throws ArchivalDAOException {
PreparedStatement stmt = null;
try {
Connection conn = ArchivalDestinationDAOFactory.getConnection();
conn.setAutoCommit(false);
String sql = "DELETE FROM DM_COMMAND_OPERATION_ARCH" +
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting command operations", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
}
@Override
public void deleteProfileOperations() throws ArchivalDAOException {
PreparedStatement stmt = null;
try {
Connection conn = ArchivalDestinationDAOFactory.getConnection();
conn.setAutoCommit(false);
String sql = "DELETE FROM DM_PROFILE_OPERATION_ARCH" +
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting profile operations", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
}
@Override
public void deleteEnrolmentMappings() throws ArchivalDAOException {
PreparedStatement stmt = null;
try {
Connection conn = ArchivalDestinationDAOFactory.getConnection();
conn.setAutoCommit(false);
String sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting enrolment mappings", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
}
@Override
public void deleteOperations() throws ArchivalDAOException {
PreparedStatement stmt = null;
try {
Connection conn = ArchivalDestinationDAOFactory.getConnection();
conn.setAutoCommit(false);
String sql = "DELETE FROM DM_OPERATION_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, this.retentionPeriod);
stmt.addBatch();
stmt.executeBatch();
conn.commit();
} catch (SQLException e) {
throw new ArchivalDAOException("Error occurred while deleting operations", e);
} finally {
ArchivalDAOUtil.cleanupResources(stmt);
}
}
}

@ -17,6 +17,7 @@
*/ */
package org.wso2.carbon.device.mgt.core.config; package org.wso2.carbon.device.mgt.core.config;
import org.wso2.carbon.device.mgt.core.config.archival.ArchivalConfiguration;
import org.wso2.carbon.device.mgt.core.config.cache.CertificateCacheConfiguration; import org.wso2.carbon.device.mgt.core.config.cache.CertificateCacheConfiguration;
import org.wso2.carbon.device.mgt.core.config.geo.location.OperationAnalyticsConfiguration; import org.wso2.carbon.device.mgt.core.config.geo.location.OperationAnalyticsConfiguration;
import org.wso2.carbon.device.mgt.core.config.cache.DeviceCacheConfiguration; import org.wso2.carbon.device.mgt.core.config.cache.DeviceCacheConfiguration;
@ -54,6 +55,7 @@ public final class DeviceManagementConfig {
private OperationAnalyticsConfiguration operationAnalyticsConfiguration; private OperationAnalyticsConfiguration operationAnalyticsConfiguration;
private String defaultGroupsConfiguration; private String defaultGroupsConfiguration;
private RemoteSessionConfiguration remoteSessionConfiguration; private RemoteSessionConfiguration remoteSessionConfiguration;
private ArchivalConfiguration archivalConfiguration;
@XmlElement(name = "ManagementRepository", required = true) @XmlElement(name = "ManagementRepository", required = true)
@ -173,6 +175,15 @@ public final class DeviceManagementConfig {
public void setDefaultGroupsConfiguration(String defaultGroupsConfiguration) { public void setDefaultGroupsConfiguration(String defaultGroupsConfiguration) {
this.defaultGroupsConfiguration = defaultGroupsConfiguration; this.defaultGroupsConfiguration = defaultGroupsConfiguration;
} }
@XmlElement(name = "ArchivalConfiguration", required = true)
public ArchivalConfiguration getArchivalConfiguration() {
return archivalConfiguration;
}
public void setArchivalConfiguration(ArchivalConfiguration archivalConfiguration) {
this.archivalConfiguration = archivalConfiguration;
}
@XmlElement(name = "RemoteSessionConfiguration", required = true) @XmlElement(name = "RemoteSessionConfiguration", required = true)
public RemoteSessionConfiguration getRemoteSessionConfiguration() { public RemoteSessionConfiguration getRemoteSessionConfiguration() {
return remoteSessionConfiguration; return remoteSessionConfiguration;

@ -0,0 +1,53 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.config.archival;
import org.wso2.carbon.device.mgt.core.config.datasource.DataSourceConfig;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* This class represents the information related to data archival configurations.
*/
@XmlRootElement(name = "ArchivalConfiguration")
public class ArchivalConfiguration {
private DataSourceConfig dataSourceConfig;
private ArchivalTaskConfiguration archivalTaskConfiguration;
@XmlElement(name = "DataSourceConfiguration", required = true)
public DataSourceConfig getDataSourceConfig() {
return dataSourceConfig;
}
public void setDataSourceConfig(DataSourceConfig dataSourceConfig) {
this.dataSourceConfig = dataSourceConfig;
}
@XmlElement(name = "ArchivalTask", required = true)
public ArchivalTaskConfiguration getArchivalTaskConfiguration() {
return archivalTaskConfiguration;
}
public void setArchivalTaskConfiguration(ArchivalTaskConfiguration archivalTaskConfiguration) {
this.archivalTaskConfiguration = archivalTaskConfiguration;
}
}

@ -0,0 +1,86 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.config.archival;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "ArchivalTask")
public class ArchivalTaskConfiguration {
private boolean enabled;
private String cronExpression;
private String taskClazz;
private int retentionPeriod;
private int batchSize;
private PurgingTaskConfiguration purgingTaskConfiguration;
@XmlElement(name = "Enabled", required = true)
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
@XmlElement(name = "CronExpression", required = true)
public String getCronExpression() {
return cronExpression;
}
public void setCronExpression(String cronExpression) {
this.cronExpression = cronExpression;
}
@XmlElement(name = "TaskClass", required = true)
public String getTaskClazz() {
return taskClazz;
}
public void setTaskClazz(String taskClazz) {
this.taskClazz = taskClazz;
}
@XmlElement(name = "RetentionPeriod", required = true)
public int getRetentionPeriod() {
return retentionPeriod;
}
public void setRetentionPeriod(int retentionPeriod) {
this.retentionPeriod = retentionPeriod;
}
@XmlElement(name = "PurgingTask", required = true)
public PurgingTaskConfiguration getPurgingTaskConfiguration() {
return purgingTaskConfiguration;
}
public void setPurgingTaskConfiguration(PurgingTaskConfiguration purgingTaskConfiguration) {
this.purgingTaskConfiguration = purgingTaskConfiguration;
}
@XmlElement(name ="ExecutionBatchSize", required = true)
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
}

@ -0,0 +1,66 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.config.archival;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "PurgingTask")
public class PurgingTaskConfiguration {
private boolean enabled;
private String cronExpression;
private String taskClazz;
private int retentionPeriod;
@XmlElement(name = "Enabled", required = true)
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
@XmlElement(name = "CronExpression", required = true)
public String getCronExpression() {
return cronExpression;
}
public void setCronExpression(String cronExpression) {
this.cronExpression = cronExpression;
}
@XmlElement(name = "TaskClass", required = true)
public String getTaskClazz() {
return taskClazz;
}
public void setTaskClazz(String taskClazz) {
this.taskClazz = taskClazz;
}
@XmlElement(name = "RetentionPeriod", required = true)
public int getRetentionPeriod() {
return retentionPeriod;
}
public void setRetentionPeriod(int retentionPeriod) {
this.retentionPeriod = retentionPeriod;
}
}

@ -0,0 +1,89 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.internal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import org.wso2.carbon.device.mgt.core.task.ArchivalTaskManager;
import org.wso2.carbon.device.mgt.core.task.impl.ArchivalTaskManagerImpl;
import org.wso2.carbon.ntask.core.service.TaskService;
/**
* @scr.component name="org.wso2.carbon.activity.data.archival" immediate="true"
* @scr.reference name="device.ntask.component"
* interface="org.wso2.carbon.ntask.core.service.TaskService"
* cardinality="1..1"
* policy="dynamic"
* bind="setTaskService"
* unbind="unsetTaskService"
*/
public class ActivityDataPurgingServiceComponent {
private static Log log = LogFactory.getLog(ActivityDataPurgingServiceComponent.class);
protected void activate(ComponentContext componentContext) {
try {
if (log.isDebugEnabled()) {
log.debug("Initializing activity data archival task manager bundle.");
}
ArchivalTaskManager archivalTaskManager = new ArchivalTaskManagerImpl();
// This will start the data archival task
boolean purgingTaskEnabled =
DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration()
.getArchivalTaskConfiguration().isEnabled();
if (purgingTaskEnabled) {
archivalTaskManager.scheduleArchivalTask();
} else {
log.warn("Data archival task has been disabled. It is recommended to enable archival task to prune the " +
"transactional databases tables time to time.");
}
// This will start the data deletion task.
boolean deletionTaskEnabled =
DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration()
.getArchivalTaskConfiguration().getPurgingTaskConfiguration().isEnabled();
if (deletionTaskEnabled) {
archivalTaskManager.scheduleDeletionTask();
}
} catch (Throwable e) {
log.error("Error occurred while initializing activity data archival task manager service.", e);
}
}
protected void setTaskService(TaskService taskService) {
if (log.isDebugEnabled()) {
log.debug("Setting the task service.");
}
DeviceManagementDataHolder.getInstance().setTaskService(taskService);
}
protected void unsetTaskService(TaskService taskService) {
if (log.isDebugEnabled()) {
log.debug("Removing the task service.");
}
DeviceManagementDataHolder.getInstance().setTaskService(null);
}
@SuppressWarnings("unused")
protected void deactivate(ComponentContext componentContext) {
}
}

@ -38,6 +38,8 @@ import org.wso2.carbon.device.mgt.core.app.mgt.ApplicationManagementProviderServ
import org.wso2.carbon.device.mgt.core.app.mgt.ApplicationManagerProviderServiceImpl; import org.wso2.carbon.device.mgt.core.app.mgt.ApplicationManagerProviderServiceImpl;
import org.wso2.carbon.device.mgt.core.app.mgt.config.AppManagementConfig; import org.wso2.carbon.device.mgt.core.app.mgt.config.AppManagementConfig;
import org.wso2.carbon.device.mgt.core.app.mgt.config.AppManagementConfigurationManager; import org.wso2.carbon.device.mgt.core.app.mgt.config.AppManagementConfigurationManager;
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDestinationDAOFactory;
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalSourceDAOFactory;
import org.wso2.carbon.device.mgt.core.authorization.DeviceAccessAuthorizationServiceImpl; import org.wso2.carbon.device.mgt.core.authorization.DeviceAccessAuthorizationServiceImpl;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager; import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import org.wso2.carbon.device.mgt.core.config.DeviceManagementConfig; import org.wso2.carbon.device.mgt.core.config.DeviceManagementConfig;
@ -167,6 +169,11 @@ public class DeviceManagementServiceComponent {
/* Initialize Operation Manager */ /* Initialize Operation Manager */
this.initOperationsManager(); this.initOperationsManager();
/* Initialising data archival configurations */
ArchivalSourceDAOFactory.init(dsConfig);
DataSourceConfig purgingDSConfig = config.getArchivalConfiguration().getDataSourceConfig();
ArchivalDestinationDAOFactory.init(purgingDSConfig);
PushNotificationProviderRepository pushNotificationRepo = new PushNotificationProviderRepository(); PushNotificationProviderRepository pushNotificationRepo = new PushNotificationProviderRepository();
List<String> pushNotificationProviders = config.getPushNotificationConfiguration() List<String> pushNotificationProviders = config.getPushNotificationConfiguration()
.getPushNotificationProviders(); .getPushNotificationProviders();

@ -0,0 +1,47 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.task;
public class ArchivalTaskException extends Exception {
private String errorMessage;
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public ArchivalTaskException(String message) {
super(message);
setErrorMessage(message);
}
public ArchivalTaskException(String message, Throwable cause) {
super(message, cause);
setErrorMessage(message);
}
public ArchivalTaskException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace, String errorMessage) {
super(message, cause, enableSuppression, writableStackTrace);
setErrorMessage(message);
}
}

@ -0,0 +1,27 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.task;
public interface ArchivalTaskManager {
void scheduleArchivalTask() throws ArchivalTaskException;
void scheduleDeletionTask() throws ArchivalTaskException;
}

@ -0,0 +1,87 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.task.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.core.archival.ArchivalException;
import org.wso2.carbon.device.mgt.core.archival.ArchivalService;
import org.wso2.carbon.device.mgt.core.archival.ArchivalServiceImpl;
import org.wso2.carbon.ntask.core.Task;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ArchivalTask implements Task {
private static Log log = LogFactory.getLog(ArchivalTask.class);
private ArchivalService archivalService;
@Override
public void setProperties(Map<String, String> map) {
}
@Override
public void init() {
this.archivalService = new ArchivalServiceImpl();
}
@Override
public void execute() {
log.info("Executing ArchivalTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
long startTime = System.currentTimeMillis();
try {
archivalService.archiveTransactionalRecords();
} catch (ArchivalException e) {
log.error("An error occurred while running ArchivalTask", e);
}
long endTime = System.currentTimeMillis();
long difference = endTime - startTime;
log.info("ArchivalTask completed. Total execution time: " + getDurationBreakdown(difference));
}
private String getDurationBreakdown(long millis) {
if (millis < 0) {
throw new IllegalArgumentException("Duration must be greater than zero!");
}
long days = TimeUnit.MILLISECONDS.toDays(millis);
millis -= TimeUnit.DAYS.toMillis(days);
long hours = TimeUnit.MILLISECONDS.toHours(millis);
millis -= TimeUnit.HOURS.toMillis(hours);
long minutes = TimeUnit.MILLISECONDS.toMinutes(millis);
millis -= TimeUnit.MINUTES.toMillis(minutes);
long seconds = TimeUnit.MILLISECONDS.toSeconds(millis);
StringBuilder sb = new StringBuilder(64);
sb.append(days);
sb.append(" Days ");
sb.append(hours);
sb.append(" Hours ");
sb.append(minutes);
sb.append(" Minutes ");
sb.append(seconds);
sb.append(" Seconds");
return (sb.toString());
}
}

@ -0,0 +1,128 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.task.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder;
import org.wso2.carbon.device.mgt.core.task.ArchivalTaskException;
import org.wso2.carbon.device.mgt.core.task.ArchivalTaskManager;
import org.wso2.carbon.ntask.common.TaskException;
import org.wso2.carbon.ntask.core.TaskInfo;
import org.wso2.carbon.ntask.core.TaskManager;
import org.wso2.carbon.ntask.core.service.TaskService;
import java.util.HashMap;
import java.util.Map;
public class ArchivalTaskManagerImpl implements ArchivalTaskManager {
private static final String TASK_TYPE_ARCHIVAL = "DATA_ARCHIVAL";
private static final String TASK_TYPE_DELETION = "DATA_DELETION";
private static final String TASK_NAME_ARCHIVAL = "DATA_ARCHIVAL_TASK";
private static final String TASK_NAME_DELETION = "DATA_DELETION_TASK";
private static final String TENANT_ID = "TENANT_ID";
private static Log log = LogFactory.getLog(ArchivalTaskManagerImpl.class);
public void scheduleArchivalTask() throws ArchivalTaskException {
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
try {
TaskService taskService = DeviceManagementDataHolder.getInstance().getTaskService();
taskService.registerTaskType(TASK_TYPE_ARCHIVAL);
if (log.isDebugEnabled()) {
log.debug("Data archival task is started for the tenant id " + tenantId);
}
String taskClazz = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
.getArchivalConfiguration().getArchivalTaskConfiguration().getTaskClazz();
String cronExpression = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
.getArchivalConfiguration().getArchivalTaskConfiguration().getCronExpression();
TaskManager taskManager = taskService.getTaskManager(TASK_TYPE_ARCHIVAL);
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
triggerInfo.setCronExpression(cronExpression);
triggerInfo.setRepeatCount(-1);
triggerInfo.setDisallowConcurrentExecution(true);
Map<String, String> properties = new HashMap<>();
properties.put(TENANT_ID, String.valueOf(tenantId));
if (!taskManager.isTaskScheduled(TASK_NAME_ARCHIVAL)) {
TaskInfo taskInfo = new TaskInfo(TASK_NAME_ARCHIVAL, taskClazz, properties, triggerInfo);
taskManager.registerTask(taskInfo);
taskManager.rescheduleTask(taskInfo.getName());
} else {
throw new ArchivalTaskException("Data archival task is already started for this tenant " +
tenantId);
}
} catch (TaskException e) {
throw new ArchivalTaskException("Error occurred while creating the task for tenant " + tenantId, e);
}
}
public void scheduleDeletionTask() throws ArchivalTaskException {
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
try {
TaskService taskService = DeviceManagementDataHolder.getInstance().getTaskService();
taskService.registerTaskType(TASK_TYPE_DELETION);
String taskClazz = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
.getArchivalConfiguration().getArchivalTaskConfiguration()
.getPurgingTaskConfiguration().getTaskClazz();
String cronExpression = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
.getArchivalConfiguration().getArchivalTaskConfiguration()
.getPurgingTaskConfiguration().getCronExpression();
if (log.isDebugEnabled()) {
log.debug("Data deletion task is started for the tenant id " + tenantId);
}
TaskManager taskManager = taskService.getTaskManager(TASK_TYPE_DELETION);
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
triggerInfo.setCronExpression(cronExpression);
triggerInfo.setRepeatCount(-1);
triggerInfo.setDisallowConcurrentExecution(true);
Map<String, String> properties = new HashMap<>();
properties.put(TENANT_ID, String.valueOf(tenantId));
if (!taskManager.isTaskScheduled(TASK_NAME_DELETION)) {
TaskInfo taskInfo = new TaskInfo(TASK_NAME_DELETION, taskClazz, properties, triggerInfo);
taskManager.registerTask(taskInfo);
taskManager.rescheduleTask(taskInfo.getName());
} else {
throw new ArchivalTaskException("Data deletion task is already started for this tenant " +
tenantId);
}
} catch (TaskException e) {
throw new ArchivalTaskException("Error occurred while creating the task for tenant " + tenantId, e);
}
}
}

@ -0,0 +1,61 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.task.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.core.archival.ArchivalException;
import org.wso2.carbon.device.mgt.core.archival.ArchivalService;
import org.wso2.carbon.device.mgt.core.archival.ArchivalServiceImpl;
import org.wso2.carbon.ntask.core.Task;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
public class ArchivedDataDeletionTask implements Task {
private static Log log = LogFactory.getLog(ArchivedDataDeletionTask.class);
private ArchivalService archivalService;
@Override
public void setProperties(Map<String, String> map) {
}
@Override
public void init() {
this.archivalService = new ArchivalServiceImpl();
}
@Override
public void execute() {
log.info("Executing DataDeletionTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
long startTime = System.nanoTime();
try {
archivalService.deleteArchivedRecords();
} catch (ArchivalException e) {
log.error("An error occurred while executing DataDeletionTask", e);
}
long endTime = System.nanoTime();
long difference = (endTime - startTime) / 1000000 * 1000;
log.info("DataDeletionTask completed. Total execution time: " + difference + " seconds");
}
}

@ -92,5 +92,30 @@
<PublishLocationOperationResponse>false</PublishLocationOperationResponse> <PublishLocationOperationResponse>false</PublishLocationOperationResponse>
</GeoLocationConfiguration> </GeoLocationConfiguration>
<DefaultGroupsConfiguration>BYOD,COPE</DefaultGroupsConfiguration> <DefaultGroupsConfiguration>BYOD,COPE</DefaultGroupsConfiguration>
<ArchivalConfiguration>
<DataSourceConfiguration>
<JndiLookupDefinition>
<Name>jdbc/DM_ARCHIVAL_DS</Name>
</JndiLookupDefinition>
</DataSourceConfiguration>
<ArchivalTask>
<Enabled>false</Enabled>
<TaskClass>org.wso2.carbon.device.mgt.core.task.impl.ArchivalTask</TaskClass>
<!-- Cron expression to run the task at specified time -->
<CronExpression>0 0 0 1/1 * ? *</CronExpression>
<!-- How many days of data should we keep in transactional tables? Must be in number of days -->
<RetentionPeriod>30</RetentionPeriod>
<ExecutionBatchSize>1000</ExecutionBatchSize>
<PurgingTask>
<Enabled>false</Enabled>
<TaskClass>org.wso2.carbon.device.mgt.core.task.impl.ArchivedDataDeletionTask</TaskClass>
<!-- Cron expression to run the task at specified time -->
<CronExpression>0 0 3 1/1 * ? *</CronExpression>
<!-- After this number of days, data will be permanantly deleted from archival tables.
Data retention period must be in number of DAYS -->
<RetentionPeriod>365</RetentionPeriod>
</PurgingTask>
</ArchivalTask>
</ArchivalConfiguration>
</DeviceMgtConfiguration> </DeviceMgtConfiguration>

@ -589,3 +589,10 @@ DM_DEVICE.ID = DM_DEVICE_DETAIL.DEVICE_ID
ORDER BY TENANT_ID, DEVICE_ID; ORDER BY TENANT_ID, DEVICE_ID;
-- END OF DASHBOARD RELATED VIEWS -- -- END OF DASHBOARD RELATED VIEWS --
-- TEMP TABLE REQUIRED FOR DATA ARCHIVAL JOB
CREATE TABLE IF NOT EXISTS DM_ARCHIVED_OPERATIONS (
ID INTEGER NOT NULL,
CREATED_TIMESTAMP TIMESTAMP NOT NULL,
PRIMARY KEY (ID)
)ENGINE = InnoDB;

@ -0,0 +1,61 @@
CREATE TABLE IF NOT EXISTS DM_OPERATION_ARCH (
ID INTEGER NOT NULL,
TYPE VARCHAR(20) NOT NULL,
CREATED_TIMESTAMP TIMESTAMP NOT NULL,
RECEIVED_TIMESTAMP TIMESTAMP NULL,
OPERATION_CODE VARCHAR(50) NOT NULL,
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (ID)
)ENGINE = InnoDB;
CREATE TABLE IF NOT EXISTS DM_ENROLMENT_OP_MAPPING_ARCH (
ID INTEGER NOT NULL,
ENROLMENT_ID INTEGER NOT NULL,
OPERATION_ID INTEGER NOT NULL,
STATUS VARCHAR(50) NULL,
PUSH_NOTIFICATION_STATUS VARCHAR(50) NULL,
CREATED_TIMESTAMP INTEGER NOT NULL,
UPDATED_TIMESTAMP INTEGER NOT NULL,
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (ID)
)ENGINE = InnoDB;
CREATE TABLE IF NOT EXISTS DM_DEVICE_OPERATION_RESPONSE_ARCH (
ID INT(11) NOT NULL,
ENROLMENT_ID INTEGER NOT NULL,
OPERATION_ID INTEGER NOT NULL,
EN_OP_MAP_ID INTEGER NOT NULL,
OPERATION_RESPONSE LONGBLOB DEFAULT NULL,
RECEIVED_TIMESTAMP TIMESTAMP NULL,
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (ID)
)ENGINE = InnoDB;
CREATE TABLE IF NOT EXISTS DM_NOTIFICATION_ARCH (
NOTIFICATION_ID INTEGER NOT NULL,
DEVICE_ID INTEGER NOT NULL,
OPERATION_ID INTEGER NOT NULL,
TENANT_ID INTEGER NOT NULL,
STATUS VARCHAR(10) NULL,
DESCRIPTION VARCHAR(1000) NULL,
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (NOTIFICATION_ID)
)ENGINE = InnoDB;
CREATE TABLE IF NOT EXISTS DM_COMMAND_OPERATION_ARCH (
OPERATION_ID INTEGER NOT NULL,
ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (OPERATION_ID)
)ENGINE = InnoDB;
CREATE TABLE IF NOT EXISTS DM_PROFILE_OPERATION_ARCH (
OPERATION_ID INTEGER NOT NULL,
ENABLED INTEGER NOT NULL DEFAULT 0,
OPERATION_DETAILS BLOB DEFAULT NULL,
ARCHIVED_AT TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (OPERATION_ID)
)ENGINE = InnoDB;
Loading…
Cancel
Save