From e1e2b14b5f2a328e0fdf20c6ff03b218eeba8c2c Mon Sep 17 00:00:00 2001 From: warunalakshitha Date: Mon, 1 May 2017 20:44:30 +0530 Subject: [PATCH] Rename push status to push notification status and add configurable initial delay for scheduler task --- .../operation/mgt/OperationManager.java | 1 + .../mgt/core/DeviceManagementConstants.java | 1 + .../PushNotificationConfiguration.java | 33 +++-- .../mgt/core/dto/operation/mgt/Operation.java | 2 +- .../DeviceManagementServiceComponent.java | 30 +++-- .../operation/mgt/OperationManagerImpl.java | 6 +- .../core/operation/mgt/OperationMapping.java | 10 +- .../core/operation/mgt/dao/OperationDAO.java | 6 +- .../mgt/dao/OperationMappingDAO.java | 2 +- .../mgt/dao/impl/GenericOperationDAOImpl.java | 11 +- .../mgt/dao/impl/OperationMappingDAOImpl.java | 12 +- .../operation/OracleOperationDAOImpl.java | 8 +- .../operation/SQLServerOperationDAOImpl.java | 8 +- .../task/PushNotificationSchedulerTask.java | 124 +++++++++--------- 14 files changed, 139 insertions(+), 115 deletions(-) diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/operation/mgt/OperationManager.java b/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/operation/mgt/OperationManager.java index 8c8818f736..ce548628b9 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/operation/mgt/OperationManager.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.common/src/main/java/org/wso2/carbon/device/mgt/common/operation/mgt/OperationManager.java @@ -21,6 +21,7 @@ import org.wso2.carbon.device.mgt.common.*; import org.wso2.carbon.device.mgt.common.push.notification.NotificationStrategy; import java.util.List; +import java.util.Map; /** * This represents the Device Operation management functionality which should be implemented by diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/DeviceManagementConstants.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/DeviceManagementConstants.java index 7e5e184358..15df1e4435 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/DeviceManagementConstants.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/DeviceManagementConstants.java @@ -90,6 +90,7 @@ public final class DeviceManagementConstants { private PushNotifications() { throw new AssertionError(); } + public static final int DEFAULT_SCHEDULER_TASK_INITIAL_DELAY = 60000; public static final int DEFAULT_BATCH_DELAY_MILLS = 60000; public static final int DEFAULT_BATCH_SIZE = 1000; } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/config/push/notification/PushNotificationConfiguration.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/config/push/notification/PushNotificationConfiguration.java index 6872affcf8..58ec787668 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/config/push/notification/PushNotificationConfiguration.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/config/push/notification/PushNotificationConfiguration.java @@ -28,35 +28,46 @@ import java.util.List; @XmlRootElement(name = "PushNotificationConfiguration") public class PushNotificationConfiguration { - private int SchedulerBatchSize; - private int SchedulerBatchDelayMills; - private boolean SchedulerTaskEnabled; + private int schedulerBatchSize; + private int schedulerBatchDelayMills; + private int schedulerTaskInitialDelay; + private boolean schedulerTaskEnabled; private List pushNotificationProviders; @XmlElement(name = "SchedulerBatchSize", required = true) public int getSchedulerBatchSize() { - return SchedulerBatchSize; + return schedulerBatchSize; } - public void setSchedulerBatchSize(int SchedulerBatchSize) { - this.SchedulerBatchSize = SchedulerBatchSize; + public void setSchedulerBatchSize(int schedulerBatchSize) { + this.schedulerBatchSize = schedulerBatchSize; } @XmlElement(name = "SchedulerBatchDelayMills", required = true) public int getSchedulerBatchDelayMills() { - return SchedulerBatchDelayMills; + return schedulerBatchDelayMills; } - public void setSchedulerBatchDelayMills(int SchedulerBatchDelayMills) { - this.SchedulerBatchDelayMills = SchedulerBatchDelayMills; + public void setSchedulerBatchDelayMills(int schedulerBatchDelayMills) { + this.schedulerBatchDelayMills = schedulerBatchDelayMills; } + + @XmlElement(name = "SchedulerTaskInitialDelay", required = true) + public int getSchedulerTaskInitialDelay() { + return schedulerTaskInitialDelay; + } + + public void setSchedulerTaskInitialDelay(int schedulerTaskInitialDelay) { + this.schedulerTaskInitialDelay = schedulerTaskInitialDelay; + } + @XmlElement(name = "SchedulerTaskEnabled", required = true) public boolean isSchedulerTaskEnabled() { - return SchedulerTaskEnabled; + return schedulerTaskEnabled; } public void setSchedulerTaskEnabled(boolean schedulerTaskEnabled) { - SchedulerTaskEnabled = schedulerTaskEnabled; + this.schedulerTaskEnabled = schedulerTaskEnabled; } @XmlElementWrapper(name = "PushNotificationProviders", required = true) diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dto/operation/mgt/Operation.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dto/operation/mgt/Operation.java index c3acb18076..3b5322bbf7 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dto/operation/mgt/Operation.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/dto/operation/mgt/Operation.java @@ -35,7 +35,7 @@ public class Operation implements Serializable { REPEAT, NO_REPEAT, PAUSE_SEQUENCE, STOP_SEQUENCE } - public enum PushStatus { + public enum PushNotificationStatus { SCHEDULED, COMPLETED } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/internal/DeviceManagementServiceComponent.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/internal/DeviceManagementServiceComponent.java index deec497e12..0c1a62471d 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/internal/DeviceManagementServiceComponent.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/internal/DeviceManagementServiceComponent.java @@ -182,25 +182,37 @@ public class DeviceManagementServiceComponent { * of Device Management Service component in order to avoid bundle start up order related complications */ notifyStartupListeners(); if (log.isDebugEnabled()) { - log.debug("Device management core bundle has been successfully initialized"); + log.debug("Push notification batch enabled : " + config.getPushNotificationConfiguration() + .isSchedulerTaskEnabled()); } // Start Push Notification Scheduler Task if (config.getPushNotificationConfiguration().isSchedulerTaskEnabled()) { if (config.getPushNotificationConfiguration().getSchedulerBatchSize() <= 0) { - log.error("Push notification batch size cannot be 0 or less than 0. Setting default batch size to:" + - " " + DeviceManagementConstants.PushNotifications.DEFAULT_BATCH_SIZE); - config.getPushNotificationConfiguration().setSchedulerBatchSize(DeviceManagementConstants.PushNotifications - .DEFAULT_BATCH_SIZE); + log.error("Push notification batch size cannot be 0 or less than 0. Setting default batch size " + + "to:" + DeviceManagementConstants.PushNotifications.DEFAULT_BATCH_SIZE); + config.getPushNotificationConfiguration().setSchedulerBatchSize(DeviceManagementConstants + .PushNotifications.DEFAULT_BATCH_SIZE); } if (config.getPushNotificationConfiguration().getSchedulerBatchDelayMills() <= 0) { log.error("Push notification batch delay cannot be 0 or less than 0. Setting default batch delay " + "milliseconds to" + DeviceManagementConstants.PushNotifications.DEFAULT_BATCH_DELAY_MILLS); - config.getPushNotificationConfiguration().setSchedulerBatchDelayMills(DeviceManagementConstants.PushNotifications - .DEFAULT_BATCH_DELAY_MILLS); + config.getPushNotificationConfiguration().setSchedulerBatchDelayMills(DeviceManagementConstants + .PushNotifications.DEFAULT_BATCH_DELAY_MILLS); + } + if (config.getPushNotificationConfiguration().getSchedulerTaskInitialDelay() < 0) { + log.error("Push notification initial delay cannot be less than 0. Setting default initial " + + "delay milliseconds to" + DeviceManagementConstants.PushNotifications + .DEFAULT_SCHEDULER_TASK_INITIAL_DELAY); + config.getPushNotificationConfiguration().setSchedulerTaskInitialDelay(DeviceManagementConstants + .PushNotifications.DEFAULT_SCHEDULER_TASK_INITIAL_DELAY); } ScheduledExecutorService pushNotificationExecutor = Executors.newSingleThreadScheduledExecutor(); - pushNotificationExecutor.schedule(new PushNotificationSchedulerTask(), config.getPushNotificationConfiguration() - .getSchedulerBatchDelayMills(), TimeUnit.MILLISECONDS); + pushNotificationExecutor.scheduleWithFixedDelay(new PushNotificationSchedulerTask(), config + .getPushNotificationConfiguration().getSchedulerTaskInitialDelay(), config + .getPushNotificationConfiguration().getSchedulerBatchDelayMills(), TimeUnit.MILLISECONDS); + } + if (log.isDebugEnabled()) { + log.debug("Device management core bundle has been successfully initialized"); } } catch (Throwable e) { log.error("Error occurred while initializing device management core bundle", e); diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java index 362bf0722e..41530bfb48 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java @@ -194,15 +194,13 @@ public class OperationManagerImpl implements OperationManager { log.debug("Sending push notification to " + deviceId + " from add operation method."); } notificationStrategy.execute(new NotificationContext(deviceId, operation)); - operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon - .device.mgt.core.dto.operation.mgt.Operation.PushStatus.COMPLETED); + operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.COMPLETED); } catch (PushNotificationExecutionFailedException e) { log.error("Error occurred while sending push notifications to " + deviceId.getType() + " device carrying id '" + deviceId + "'", e); // Reschedule if push notification failed. - operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon - .device.mgt.core.dto.operation.mgt.Operation.PushStatus.SCHEDULED); + operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED); } } } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationMapping.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationMapping.java index 6775ebd92a..6f42a78d15 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationMapping.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationMapping.java @@ -29,7 +29,7 @@ public class OperationMapping { private int operationId; private int tenantId; private Operation.Status status; - private Operation.PushStatus pushStatus; + private Operation.PushNotificationStatus pushNotificationStatus; public int getOperationId() { return operationId; @@ -63,11 +63,11 @@ public class OperationMapping { this.status = status; } - public Operation.PushStatus getPushStatus() { - return pushStatus; + public Operation.PushNotificationStatus getPushNotificationStatus() { + return pushNotificationStatus; } - public void setPushStatus(Operation.PushStatus pushStatus) { - this.pushStatus = pushStatus; + public void setPushNotificationStatus(Operation.PushNotificationStatus pushNotificationStatus) { + this.pushNotificationStatus = pushNotificationStatus; } } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationDAO.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationDAO.java index 79da36c052..5e3848cf9b 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationDAO.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationDAO.java @@ -85,12 +85,12 @@ public interface OperationDAO { /** * This method provides operation mappings for given status * @param opStatus Operation status - * @param pushStatus Push notification Status + * @param pushNotificationStatus Push notification Status * @param limit Limit for no devices * @return Tenant based operation mappings list * @throws OperationManagementDAOException */ - Map> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushStatus pushStatus, - int limit) throws OperationManagementDAOException;; + Map> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus, + int limit) throws OperationManagementDAOException; } \ No newline at end of file diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationMappingDAO.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationMappingDAO.java index 003aad11f5..72d02ec9bc 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationMappingDAO.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationMappingDAO.java @@ -29,7 +29,7 @@ public interface OperationMappingDAO { void removeOperationMapping(int operationId, Integer deviceId) throws OperationManagementDAOException; - void updateOperationMapping(int operationId, Integer deviceId, Operation.PushStatus pushStatus) throws + void updateOperationMapping(int operationId, Integer deviceId, Operation.PushNotificationStatus pushNotificationStatus) throws OperationManagementDAOException; void updateOperationMapping(List operationMappingList) throws OperationManagementDAOException; diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java index f29c3ed05d..3e50f49800 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java @@ -1095,21 +1095,22 @@ public class GenericOperationDAOImpl implements OperationDAO { } @Override - public Map> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushStatus pushStatus, + public Map> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus, int limit) throws OperationManagementDAOException { PreparedStatement stmt = null; ResultSet rs = null; + Connection conn; OperationMapping operationMapping; Map> operationMappingsTenantMap = new HashMap<>(); try { - Connection conn = OperationManagementDAOFactory.getConnection(); + conn = OperationManagementDAOFactory.getConnection(); String sql = "SELECT op.ENROLMENT_ID, op.OPERATION_ID, dt.NAME ,d.TENANT_ID FROM DM_DEVICE d, " + - "DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = '?' AND " + - "op.PUSH_NOTIFICATION_STATUS = '?' AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID ORDER BY " + + "DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = ? AND " + + "op.PUSH_NOTIFICATION_STATUS = ? AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID ORDER BY " + "op.OPERATION_ID LIMIT ?"; stmt = conn.prepareStatement(sql); stmt.setString(1, opStatus.toString()); - stmt.setString(2, pushStatus.toString()); + stmt.setString(2, pushNotificationStatus.toString()); stmt.setInt(3, limit); rs = stmt.executeQuery(); while (rs.next()) { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/OperationMappingDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/OperationMappingDAOImpl.java index 867353aa8d..0846813fdf 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/OperationMappingDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/OperationMappingDAOImpl.java @@ -46,9 +46,9 @@ public class OperationMappingDAOImpl implements OperationMappingDAO { stmt.setInt(2, operationId); stmt.setString(3, Operation.Status.PENDING.toString()); if (isScheduled) { - stmt.setString(4, Operation.PushStatus.SCHEDULED.toString()); + stmt.setString(4, Operation.PushNotificationStatus.SCHEDULED.toString()); } else { - stmt.setString(4, Operation.PushStatus.COMPLETED.toString()); + stmt.setString(4, Operation.PushNotificationStatus.COMPLETED.toString()); } stmt.setLong(5, time); stmt.setLong(6, time); @@ -79,14 +79,14 @@ public class OperationMappingDAOImpl implements OperationMappingDAO { } @Override - public void updateOperationMapping(int operationId, Integer deviceId, Operation.PushStatus pushStatus) throws OperationManagementDAOException { + public void updateOperationMapping(int operationId, Integer deviceId, Operation.PushNotificationStatus pushNotificationStatus) throws OperationManagementDAOException { PreparedStatement stmt = null; try { Connection conn = OperationManagementDAOFactory.getConnection(); String sql = "UPDATE DM_ENROLMENT_OP_MAPPING SET PUSH_NOTIFICATION_STATUS = ? WHERE ENROLMENT_ID = ? and " + "OPERATION_ID = ?"; stmt = conn.prepareStatement(sql); - stmt.setString(1, pushStatus.toString()); + stmt.setString(1, pushNotificationStatus.toString()); stmt.setInt(2, deviceId); stmt.setInt(3, operationId); stmt.executeUpdate(); @@ -108,7 +108,7 @@ public class OperationMappingDAOImpl implements OperationMappingDAO { stmt = conn.prepareStatement(sql); if (conn.getMetaData().supportsBatchUpdates()) { for (OperationMapping operationMapping : operationMappingList) { - stmt.setString(1, operationMapping.getPushStatus().toString()); + stmt.setString(1, operationMapping.getPushNotificationStatus().toString()); stmt.setInt(2, Integer.parseInt(operationMapping.getDeviceIdentifier().getId())); stmt.setInt(3, operationMapping.getOperationId()); stmt.addBatch(); @@ -116,7 +116,7 @@ public class OperationMappingDAOImpl implements OperationMappingDAO { stmt.executeBatch(); } else { for (OperationMapping operationMapping : operationMappingList) { - stmt.setString(1, operationMapping.getPushStatus().toString()); + stmt.setString(1, operationMapping.getPushNotificationStatus().toString()); stmt.setInt(2, Integer.parseInt(operationMapping.getDeviceIdentifier().getId())); stmt.setInt(3, operationMapping.getOperationId()); stmt.executeUpdate(); diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/operation/OracleOperationDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/operation/OracleOperationDAOImpl.java index 19f604fa03..321a13e46c 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/operation/OracleOperationDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/operation/OracleOperationDAOImpl.java @@ -368,7 +368,7 @@ public class OracleOperationDAOImpl extends GenericOperationDAOImpl { @Override - public Map> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushStatus pushStatus, + public Map> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus, int limit) throws OperationManagementDAOException { PreparedStatement stmt = null; ResultSet rs = null; @@ -377,13 +377,13 @@ public class OracleOperationDAOImpl extends GenericOperationDAOImpl { try { Connection conn = OperationManagementDAOFactory.getConnection(); String sql = "SELECT op.ENROLMENT_ID, op.OPERATION_ID, dt.NAME ,d.TENANT_ID FROM DM_DEVICE d, " + - "DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = '?' AND op" + - ".PUSH_NOTIFICATION_STATUS = '?' AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID AND ROWNUM" + + "DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = ? AND op" + + ".PUSH_NOTIFICATION_STATUS = ? AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID AND ROWNUM" + " <= ? ORDER BY op.OPERATION_ID"; stmt = conn.prepareStatement(sql); stmt.setString(1, opStatus.toString()); - stmt.setString(2, pushStatus.toString()); + stmt.setString(2, pushNotificationStatus.toString()); stmt.setInt(3, limit); rs = stmt.executeQuery(); while (rs.next()) { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/operation/SQLServerOperationDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/operation/SQLServerOperationDAOImpl.java index 6a69c5d4af..d19a5b151b 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/operation/SQLServerOperationDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/operation/SQLServerOperationDAOImpl.java @@ -269,7 +269,7 @@ public class SQLServerOperationDAOImpl extends GenericOperationDAOImpl { } @Override - public Map> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushStatus pushStatus, + public Map> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus, int limit) throws OperationManagementDAOException { PreparedStatement stmt = null; ResultSet rs = null; @@ -278,12 +278,12 @@ public class SQLServerOperationDAOImpl extends GenericOperationDAOImpl { try { Connection conn = OperationManagementDAOFactory.getConnection(); String sql = "SELECT op.ENROLMENT_ID, op.OPERATION_ID, dt.NAME ,d.TENANT_ID FROM DM_DEVICE d, " + - "DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = '?' AND op" + - ".PUSH_NOTIFICATION_STATUS = '?' AND d.DEVICE_TYPE_ID = dt.ID " + + "DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = ? AND op" + + ".PUSH_NOTIFICATION_STATUS = ? AND d.DEVICE_TYPE_ID = dt.ID " + "AND d.ID=op.ENROLMENT_ID ORDER BY op.OPERATION_ID OFFSET ? ROWS FETCH NEXT ? ROWS ONLY"; stmt = conn.prepareStatement(sql); stmt.setString(1, opStatus.toString()); - stmt.setString(2, pushStatus.toString()); + stmt.setString(2, pushNotificationStatus.toString()); stmt.setInt(3, limit); rs = stmt.executeQuery(); while (rs.next()) { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java index a87ff4868c..3ac4126f30 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java @@ -27,7 +27,6 @@ import org.wso2.carbon.device.mgt.common.push.notification.NotificationContext; import org.wso2.carbon.device.mgt.common.push.notification.NotificationStrategy; import org.wso2.carbon.device.mgt.common.push.notification.PushNotificationExecutionFailedException; import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager; -import org.wso2.carbon.device.mgt.core.dao.DeviceManagementDAOFactory; import org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation; import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder; import org.wso2.carbon.device.mgt.core.operation.mgt.OperationMapping; @@ -37,7 +36,6 @@ import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOF import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationMappingDAO; import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService; -import java.sql.SQLException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -56,72 +54,74 @@ public class PushNotificationSchedulerTask implements Runnable { @Override public void run() { - Map> operationMappingsTenantMap = new HashMap<>(); - List operationsCompletedList = new LinkedList<>(); - if (log.isDebugEnabled()) { - log.debug("Push notification job started"); - } try { - //Get next available operation list per device batch - DeviceManagementDAOFactory.openConnection(); - operationMappingsTenantMap = operationDAO.getOperationMappingsByStatus(Operation.Status - .PENDING, Operation.PushStatus.SCHEDULED, DeviceConfigurationManager.getInstance() - .getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize()); - } catch (SQLException e) { - log.error("Error occurred while opening a connection to the data source", e); - } catch (OperationManagementDAOException e) { - log.error("Unable to retrieve scheduled pending operations for task.", e); - } finally { - DeviceManagementDAOFactory.closeConnection(); - } - // Sending push notification to each device - for (List operationMappings : operationMappingsTenantMap.values()) { - for (OperationMapping operationMapping : operationMappings) { - try { - if (log.isDebugEnabled()) { - log.debug("Sending push notification for operationId :" + operationMapping.getOperationId() + - "to deviceId : " + operationMapping.getDeviceIdentifier().getId()); - } - // Set tenant id and domain - PrivilegedCarbonContext.startTenantFlow(); - PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(operationMapping.getTenantId(), true); - // Get notification strategy for given device type - NotificationStrategy notificationStrategy = provider.getNotificationStrategyByDeviceType - (operationMapping.getDeviceIdentifier().getType()); - // Send the push notification on given strategy - notificationStrategy.execute(new NotificationContext(operationMapping.getDeviceIdentifier(), - provider.getOperation(operationMapping.getDeviceIdentifier().getType(), operationMapping - .getOperationId()))); - operationMapping.setPushStatus(Operation.PushStatus.COMPLETED); - operationsCompletedList.add(operationMapping); - } catch (DeviceManagementException e) { - log.error("Error occurred while getting notification strategy for operation mapping " + - operationMapping.getDeviceIdentifier().getType(), e); - } catch (OperationManagementException e) { - log.error("Unable to get the operation for operation " + operationMapping.getOperationId(), e); - } catch (PushNotificationExecutionFailedException e) { - log.error("Error occurred while sending push notification to operation: " + operationMapping - .getOperationId(), e); - } finally { - PrivilegedCarbonContext.endTenantFlow(); - } + Map> operationMappingsTenantMap = new HashMap<>(); + List operationsCompletedList = new LinkedList<>(); + if (log.isDebugEnabled()) { + log.debug("Push notification job started"); } - } - // Update push notification status to competed for operations which already sent - if (operationsCompletedList.size() > 0) { try { - OperationManagementDAOFactory.beginTransaction(); - operationMappingDAO.updateOperationMapping(operationsCompletedList); - OperationManagementDAOFactory.commitTransaction(); - } catch (TransactionManagementException | OperationManagementDAOException e) { - OperationManagementDAOFactory.rollbackTransaction(); - log.error("Error occurred while updating operation mappings for sent notifications ", e); + //Get next available operation list per device batch + OperationManagementDAOFactory.openConnection(); + operationMappingsTenantMap = operationDAO.getOperationMappingsByStatus(Operation.Status + .PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance() + .getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize()); + } catch (OperationManagementDAOException e) { + log.error("Unable to retrieve scheduled pending operations for task.", e); } finally { OperationManagementDAOFactory.closeConnection(); } - } - if (log.isDebugEnabled()) { - log.debug("Push notification job running completed."); + // Sending push notification to each device + for (List operationMappings : operationMappingsTenantMap.values()) { + for (OperationMapping operationMapping : operationMappings) { + try { + if (log.isDebugEnabled()) { + log.debug("Sending push notification for operationId :" + operationMapping.getOperationId() + + "to deviceId : " + operationMapping.getDeviceIdentifier().getId()); + } + // Set tenant id and domain + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(operationMapping.getTenantId(), true); + // Get notification strategy for given device type + NotificationStrategy notificationStrategy = provider.getNotificationStrategyByDeviceType + (operationMapping.getDeviceIdentifier().getType()); + // Send the push notification on given strategy + notificationStrategy.execute(new NotificationContext(operationMapping.getDeviceIdentifier(), + provider.getOperation(operationMapping.getDeviceIdentifier().getType(), operationMapping + .getOperationId()))); + operationMapping.setPushNotificationStatus(Operation.PushNotificationStatus.COMPLETED); + operationsCompletedList.add(operationMapping); + } catch (DeviceManagementException e) { + log.error("Error occurred while getting notification strategy for operation mapping " + + operationMapping.getDeviceIdentifier().getType(), e); + } catch (OperationManagementException e) { + log.error("Unable to get the operation for operation " + operationMapping.getOperationId(), e); + } catch (PushNotificationExecutionFailedException e) { + log.error("Error occurred while sending push notification to operation: " + operationMapping + .getOperationId(), e); + } finally { + PrivilegedCarbonContext.endTenantFlow(); + } + } + } + // Update push notification status to competed for operations which already sent + if (operationsCompletedList.size() > 0) { + try { + OperationManagementDAOFactory.beginTransaction(); + operationMappingDAO.updateOperationMapping(operationsCompletedList); + OperationManagementDAOFactory.commitTransaction(); + } catch (TransactionManagementException | OperationManagementDAOException e) { + OperationManagementDAOFactory.rollbackTransaction(); + log.error("Error occurred while updating operation mappings for sent notifications ", e); + } finally { + OperationManagementDAOFactory.closeConnection(); + } + } + if (log.isDebugEnabled()) { + log.debug("Push notification job running completed."); + } + } catch (Throwable cause) { + log.error("PushNotificationSchedulerTask failed due to " + cause); } } }