Merge pull request #1250 from abhishekdesilva/master

Add archive pending operations configuration
4.x.x
Geeth 6 years ago committed by GitHub
commit a553e9aada
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -38,10 +38,14 @@ public class ArchivalServiceImpl implements ArchivalService {
private ArchivalDAO archivalDAO;
private DataDeletionDAO dataDeletionDAO;
private static int ITERATION_COUNT =
private static final int EXECUTION_BATCH_SIZE =
DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration()
.getArchivalTaskConfiguration().getBatchSize();
private static final boolean ARCHIVE_PENDING_OPERATIONS =
DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration()
.getArchivalTaskConfiguration().isArchivePendingOperations();
private String[] NOT_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"};
private String[] NOT_PENDING_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED", "IN_PROGRESS"};
private String[] NOT_PENDING_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"};
@ -54,7 +58,7 @@ public class ArchivalServiceImpl implements ArchivalService {
@Override
public void archiveTransactionalRecords() throws ArchivalException {
List<Integer> allOperations;
List<Integer> pendingAndIPOperations;
try {
ArchivalSourceDAOFactory.openConnection();
ArchivalDestinationDAOFactory.openConnection();
@ -67,7 +71,7 @@ public class ArchivalServiceImpl implements ArchivalService {
if (log.isDebugEnabled()) {
log.debug("Fetching All Pending Operations");
}
pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations();
} catch (ArchivalDAOException e) {
// rollbackTransactions();
@ -83,19 +87,35 @@ public class ArchivalServiceImpl implements ArchivalService {
ArchivalDestinationDAOFactory.closeConnection();
}
log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() +
" P&IP Operations");
//Get the diff of operations
Set<Integer> setA = new HashSet<>(allOperations);
Set<Integer> setB = new HashSet<>(pendingAndIPOperations);
setA.removeAll(setB);
List<Integer> candidates = allOperations;
log.info(allOperations.size() + " All Operations.");
List<Integer> candidates = new ArrayList<>();
candidates.addAll(setA);
if (!ARCHIVE_PENDING_OPERATIONS) {
try {
ArchivalSourceDAOFactory.openConnection();
ArchivalDestinationDAOFactory.openConnection();
List<Integer> pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations();
log.info(pendingAndIPOperations.size() +" P&IP Operations");
// Get the diff of operations
candidates.removeAll(pendingAndIPOperations);
} catch (ArchivalDAOException e) {
String msg = "Error occurred while retrieving the pending operations";
log.error(msg, e);
throw new ArchivalException(msg, e);
} catch (SQLException e) {
String msg = "An error occurred while connecting to the archival database";
log.error(msg, e);
throw new ArchivalException(msg, e);
} finally {
ArchivalSourceDAOFactory.closeConnection();
ArchivalDestinationDAOFactory.closeConnection();
}
}
int total = candidates.size();
int batches = calculateNumberOfBatches(total);
int batchSize = ITERATION_COUNT;
log.info(total + " Operations ready for archiving. " + batches + " iterations to be done.");
int batchSize = EXECUTION_BATCH_SIZE;
if (log.isDebugEnabled()) {
log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done.");
log.debug(batchSize + " is the batch size");
@ -278,7 +298,7 @@ public class ArchivalServiceImpl implements ArchivalService {
private int calculateNumberOfBatches(int total) {
int batches = 0;
int batchSize = ITERATION_COUNT;
int batchSize = EXECUTION_BATCH_SIZE;
if ((total % batchSize) > 0) {
batches = (total / batchSize) + 1;
} else {

@ -55,15 +55,15 @@ public class ArchivalDAOImpl implements ArchivalDAO {
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT DISTINCT OPERATION_ID FROM DM_ENROLMENT_OP_MAPPING " +
"WHERE CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)";
String sql = "SELECT ID FROM DM_OPERATION WHERE CREATED_TIMESTAMP < (DATE_SUB(NOW(), INTERVAL "
+ this.retentionPeriod + " DAY))";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
if (log.isDebugEnabled()) {
log.debug("Selected Operation Ids from Enrolment OP Mapping");
}
while (rs.next()) {
operationIds.add(rs.getInt("OPERATION_ID"));
operationIds.add(rs.getInt("ID"));
}
} catch (SQLException e) {
String msg = "An error occurred while getting a list operation Ids to archive";
@ -86,9 +86,23 @@ public class ArchivalDAOImpl implements ArchivalDAO {
ResultSet rs = null;
try {
Connection conn = ArchivalSourceDAOFactory.getConnection();
String sql = "SELECT DISTINCT OPERATION_ID " +
" FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS='PENDING' OR STATUS='IN_PROGRESS' " +
" AND CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)";
String sql = "(SELECT DISTINCT\n" +
" OPERATION_ID\n" +
" FROM\n" +
" DM_ENROLMENT_OP_MAPPING\n" +
" WHERE\n" +
" STATUS = 'PENDING'\n" +
" AND CREATED_TIMESTAMP < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL "
+ this.retentionPeriod + " DAY))) \n" +
" UNION ALL \n" +
"\t(SELECT DISTINCT\n" +
" OPERATION_ID\n" +
" FROM\n" +
" DM_ENROLMENT_OP_MAPPING\n" +
" WHERE\n" +
" STATUS = 'IN_PROGRESS'\n" +
" AND CREATED_TIMESTAMP < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL " +
"" + this.retentionPeriod + " DAY)))";
stmt = this.createMemoryEfficientStatement(conn);
rs = stmt.executeQuery(sql);
if (log.isDebugEnabled()) {

@ -29,7 +29,7 @@ public class ArchivalTaskConfiguration {
private int retentionPeriod;
private int batchSize;
private PurgingTaskConfiguration purgingTaskConfiguration;
private final int MULTIPLIER = -1;
private boolean archivePendingOperations;
@XmlElement(name = "Enabled", required = true)
public boolean isEnabled() {
@ -60,8 +60,7 @@ public class ArchivalTaskConfiguration {
@XmlElement(name = "RetentionPeriod", required = true)
public int getRetentionPeriod() {
// multiply by -1 to get the diff
return retentionPeriod * MULTIPLIER;
return retentionPeriod;
}
public void setRetentionPeriod(int retentionPeriod) {
@ -85,4 +84,13 @@ public class ArchivalTaskConfiguration {
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
@XmlElement(name ="ArchivePendingOperations")
public boolean isArchivePendingOperations() {
return archivePendingOperations;
}
public void setArchivePendingOperations(boolean archivePendingOperations) {
this.archivePendingOperations = archivePendingOperations;
}
}

Loading…
Cancel
Save