From 77f7792bc791ed8376593c3b75f9da7fe659cd74 Mon Sep 17 00:00:00 2001 From: Milan Perera Date: Mon, 10 Sep 2018 10:31:10 +0530 Subject: [PATCH 1/2] Fix wso2/product-iots#1863 --- .../core/operation/mgt/OperationManagerImpl.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java index 5b7de442051..7ca0d0424f1 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java @@ -476,6 +476,19 @@ public class OperationManagerImpl implements OperationManager { } } + private void sendNotification(Operation operation, DeviceIdentifier deviceId) { + NotificationStrategy notificationStrategy = getNotificationStrategy(); + if (notificationStrategy != null) { + try { + notificationStrategy.execute(new NotificationContext(deviceId, operation)); + } catch (PushNotificationExecutionFailedException e) { + log.error("Error occurred while sending push notifications to " + + deviceId.getType() + " device carrying id '" + + deviceId + "'", e); + } + } + } + private List getActivityStatus(DeviceIDHolder deviceIdValidationResult, DeviceIDHolder deviceAuthResult, String deviceType) { List activityStatuses = new ArrayList<>(); From 576fa40cb00c2f562cabeedcc484189a626e9141 Mon Sep 17 00:00:00 2001 From: Milan Perera Date: Wed, 12 Sep 2018 10:59:03 +0530 Subject: [PATCH 2/2] Modify addOperation logic to improve the performance --- .../operation/mgt/OperationManagerImpl.java | 139 ++++++++---------- .../core/operation/mgt/dao/OperationDAO.java | 2 +- .../mgt/dao/impl/GenericOperationDAOImpl.java | 8 +- 3 files changed, 64 insertions(+), 85 deletions(-) diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java index 7ca0d0424f1..88d5e291f3d 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java @@ -157,8 +157,8 @@ public class OperationManagerImpl implements OperationManager { List validDeviceIds = deviceValidationResult.getValidDeviceIDList(); if (validDeviceIds.size() > 0) { DeviceIDHolder deviceAuthorizationResult = this.authorizeDevices(operation, validDeviceIds); - List authorizedDeviceList = deviceAuthorizationResult.getValidDeviceIDList(); - if (authorizedDeviceList.size() <= 0) { + List authorizedDeviceIds = deviceAuthorizationResult.getValidDeviceIDList(); + if (authorizedDeviceIds.size() <= 0) { log.warn("User : " + getUser() + " is not authorized to perform operations on given device-list."); Activity activity = new Activity(); //Send the operation statuses only for admin triggered operations @@ -186,21 +186,31 @@ public class OperationManagerImpl implements OperationManager { org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation operationDto = OperationDAOUtil.convertOperation(operation); int enrolmentId; - boolean hasExistingTaskOperation; - List pendingDeviceList = new ArrayList<>(); String operationCode = operationDto.getCode(); - for (DeviceIdentifier deviceId : authorizedDeviceList) { + List authorizedDevices = new ArrayList<>(); + List ignoredDevices = new ArrayList<>(); + for (DeviceIdentifier deviceId : authorizedDeviceIds) { Device device = getDevice(deviceId); - enrolmentId = device.getEnrolmentInfo().getId(); - hasExistingTaskOperation = operationDAO.updateTaskOperation(enrolmentId, operationCode); - if (hasExistingTaskOperation) { - pendingDeviceList.add(deviceId); + authorizedDevices.add(device); + } + + if (operationDto.getControl() == + org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT) { + int existingOperationID; + for (Device device : authorizedDevices) { + enrolmentId = device.getEnrolmentInfo().getId(); + existingOperationID = operationDAO.getExistingOperationID(enrolmentId, operationCode); + if (existingOperationID > 0) { + ignoredDevices.add(device); + operation.setId(existingOperationID); + this.sendNotification(operation, device); + } } } - if (pendingDeviceList.size() > 0) { - if (authorizedDeviceList.size() == pendingDeviceList.size()) { + if (ignoredDevices.size() > 0) { + if (authorizedDevices.size() == ignoredDevices.size()) { if (log.isDebugEnabled()) { log.debug("All the devices contain a pending operation for the Operation Code: " + operationCode); @@ -212,87 +222,34 @@ public class OperationManagerImpl implements OperationManager { deviceType)); return activity; } else { - authorizedDeviceList.removeAll(pendingDeviceList); + authorizedDevices.removeAll(ignoredDevices); } } int operationId = this.lookupOperationDAO(operation).addOperation(operationDto); - boolean isNotRepeated = false; + boolean isScheduled = false; NotificationStrategy notificationStrategy = getNotificationStrategy(); // check whether device list is greater than batch size notification strategy has enable to send push // notification using scheduler task if (DeviceConfigurationManager.getInstance().getDeviceManagementConfig(). - getPushNotificationConfiguration().getSchedulerBatchSize() <= authorizedDeviceList.size() && + getPushNotificationConfiguration().getSchedulerBatchSize() <= authorizedDeviceIds.size() && notificationStrategy != null) { isScheduled = notificationStrategy.getConfig().isScheduled(); } - List devices = new ArrayList<>(); - if (org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT == operationDto. - getControl()) { - isNotRepeated = true; - } - //TODO have to create a sql to load device details from deviceDAO using single query. - for (DeviceIdentifier deviceId : authorizedDeviceList) { - Device device = getDevice(deviceId); - devices.add(device); + for (Device device : authorizedDevices) { enrolmentId = device.getEnrolmentInfo().getId(); //Do not repeat the task operations - if (isScheduledOperation) { - hasExistingTaskOperation = operationDAO.updateTaskOperation(enrolmentId, operationCode); - if (!hasExistingTaskOperation) { - operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled); - } - } else if (isNotRepeated) { - operationDAO.updateEnrollmentOperationsStatus(enrolmentId, operationCode, - org.wso2.carbon.device.mgt.core.dto.operation.mgt. - Operation.Status.PENDING, - org.wso2.carbon.device.mgt.core.dto.operation.mgt. - Operation.Status.REPEATED); - operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled); - } else { - operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled); - } + operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled); } OperationManagementDAOFactory.commitTransaction(); - /* - If notification strategy has not enable to send push notification using scheduler task we will send - notification immediately. This is done in separate loop inorder to prevent overlap with DB insert - operations with the possible db update operations trigger followed by pending operation call. - Otherwise device may call pending operation while DB is locked for write and deadlock can occur. - */ - if (notificationStrategy != null && !isScheduled) { - for (Device device : devices) { - DeviceIdentifier deviceId = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType()); - if (log.isDebugEnabled()) { - log.debug("Sending push notification to " + deviceId + " from add operation method."); - } - operation.setId(operationId); - operation.setActivityId(DeviceManagementConstants.OperationAttributes.ACTIVITY + operationId); - try { - notificationStrategy.execute(new NotificationContext(deviceId, operation)); - } catch (PushNotificationExecutionFailedException e) { - log.error("Error occurred while sending push notifications to " + deviceId.getType() + - " device carrying id '" + deviceId + "'", e); - /* - Reschedule if push notification failed. Doing db transactions in atomic way to prevent - deadlocks. - */ - enrolmentId = device.getEnrolmentInfo().getId(); - try { - operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon - .device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED); - OperationManagementDAOFactory.commitTransaction(); - } catch (OperationManagementDAOException ex) { - // Not throwing this exception in order to keep sending remaining notifications if any. - log.error("Error occurred while setting push notification status to SCHEDULED.", ex); - OperationManagementDAOFactory.rollbackTransaction(); - } - } + if (isScheduled) { + for (Device device : authorizedDevices) { + this.sendNotification(operation, device); } } @@ -415,7 +372,7 @@ public class OperationManagerImpl implements OperationManager { } private String addOperationMappings(List authorizedDeviceList, org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation operationDto, int operationId, boolean isScheduledOperation, boolean isNotRepeated, boolean isScheduled, List devices) throws OperationManagementException, OperationManagementDAOException { int enrolmentId; - boolean hasExistingTaskOperation;//TODO have to create a sql to load device details from deviceDAO using single query. + int existingTaskOperationId;//TODO have to create a sql to load device details from deviceDAO using single query. String operationCode = operationDto.getCode(); for (DeviceIdentifier deviceId : authorizedDeviceList) { Device device = getDevice(deviceId); @@ -423,8 +380,8 @@ public class OperationManagerImpl implements OperationManager { enrolmentId = device.getEnrolmentInfo().getId(); //Do not repeat the task operations if (isScheduledOperation) { - hasExistingTaskOperation = operationDAO.updateTaskOperation(enrolmentId, operationCode); - if (!hasExistingTaskOperation) { + existingTaskOperationId = operationDAO.getExistingOperationID(enrolmentId, operationCode); + if (existingTaskOperationId != -1) { operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled); } } else if (isNotRepeated) { @@ -476,15 +433,37 @@ public class OperationManagerImpl implements OperationManager { } } - private void sendNotification(Operation operation, DeviceIdentifier deviceId) { + private void sendNotification(Operation operation, Device device) { NotificationStrategy notificationStrategy = getNotificationStrategy(); + /* + * If notification strategy has not enable to send push notification using scheduler task we will send + * notification immediately. This is done in separate loop inorder to prevent overlap with DB insert + * operations with the possible db update operations trigger followed by pending operation call. + * Otherwise device may call pending operation while DB is locked for write and deadlock can occur. + */ if (notificationStrategy != null) { + if (log.isDebugEnabled()) { + log.debug("Sending push notification to " + device.getDeviceIdentifier() + " from add operation method."); + } + DeviceIdentifier deviceIdentifier = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType()); try { - notificationStrategy.execute(new NotificationContext(deviceId, operation)); + notificationStrategy.execute(new NotificationContext(deviceIdentifier, operation)); } catch (PushNotificationExecutionFailedException e) { - log.error("Error occurred while sending push notifications to " + - deviceId.getType() + " device carrying id '" + - deviceId + "'", e); + log.error("Error occurred while sending push notifications to " + device.getType() + + " device carrying id '" + device.getDeviceIdentifier() + "'", e); + /* + * Reschedule if push notification failed. Doing db transactions in atomic way to prevent + * deadlocks. + */ + try { + operationMappingDAO.updateOperationMapping(operation.getId(), device.getEnrolmentInfo().getId(), org.wso2.carbon + .device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED); + OperationManagementDAOFactory.commitTransaction(); + } catch (OperationManagementDAOException ex) { + // Not throwing this exception in order to keep sending remaining notifications if any. + log.error("Error occurred while setting push notification status to SCHEDULED.", ex); + OperationManagementDAOFactory.rollbackTransaction(); + } } } } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationDAO.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationDAO.java index 641a91c9ed7..a40c0c5e8f4 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationDAO.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/OperationDAO.java @@ -56,7 +56,7 @@ public interface OperationDAO { void updateEnrollmentOperationsStatus(int enrolmentId, String operationCode, Operation.Status existingStatus, Operation.Status newStatus) throws OperationManagementDAOException; - boolean updateTaskOperation(int enrolmentId, String operationCode) throws OperationManagementDAOException; + int getExistingOperationID(int enrolmentId, String operationCode) throws OperationManagementDAOException; void addOperationResponse(int enrolmentId, int operationId, Object operationResponse) throws OperationManagementDAOException; diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java index cb543de8f3f..3ea9ba1f5e5 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/dao/impl/GenericOperationDAOImpl.java @@ -181,14 +181,14 @@ public class GenericOperationDAOImpl implements OperationDAO { } @Override - public boolean updateTaskOperation(int enrolmentId, String operationCode) + public int getExistingOperationID(int enrolmentId, String operationCode) throws OperationManagementDAOException { PreparedStatement stmt = null; ResultSet rs = null; - boolean result = false; + int result = -1; try { Connection connection = OperationManagementDAOFactory.getConnection(); - String query = "SELECT EOM.ID FROM DM_ENROLMENT_OP_MAPPING EOM INNER JOIN DM_OPERATION DM " + String query = "SELECT DM.ID FROM DM_ENROLMENT_OP_MAPPING EOM INNER JOIN DM_OPERATION DM " + "ON DM.ID = EOM.OPERATION_ID WHERE EOM.ENROLMENT_ID = ? AND DM.OPERATION_CODE = ? AND " + "EOM.STATUS = ?"; stmt = connection.prepareStatement(query); @@ -198,7 +198,7 @@ public class GenericOperationDAOImpl implements OperationDAO { // This will return only one result always. rs = stmt.executeQuery(); if (rs.next()) { - result = true; + result = rs.getInt("ID"); } } catch (SQLException e) { throw new OperationManagementDAOException(