From 4ff9b326b7ddf9ec4a0ab8aebf23f1e42eb307d9 Mon Sep 17 00:00:00 2001 From: charitha Date: Thu, 7 May 2020 11:07:45 +0530 Subject: [PATCH] Fix concurrency issue in push notification execution (cherry picked from commit c5b7612b135ad685e1532f4993d20d7e8097e8e2) --- .../operation/mgt/OperationManagerImpl.java | 89 +++++++++++-------- .../task/PushNotificationSchedulerTask.java | 2 +- .../main/resources/dbscripts/cdm/mysql.sql | 8 +- .../resources/dbscripts/cdm/postgresql.sql | 9 +- 4 files changed, 53 insertions(+), 55 deletions(-) diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java index c948f6374f7..ca7659f1e39 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/mgt/OperationManagerImpl.java @@ -77,6 +77,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; /** * This class implements all the functionality exposed as part of the OperationManager. Any transaction initiated @@ -105,6 +107,8 @@ public class OperationManagerImpl implements OperationManager { private final Map lastUpdatedTimeStamps; private final ConcurrentMap operationsInitBy; + private final ThreadPoolExecutor notificationExecutor; + public OperationManagerImpl() { commandOperationDAO = OperationManagementDAOFactory.getCommandOperationDAO(); configOperationDAO = OperationManagementDAOFactory.getConfigOperationDAO(); @@ -117,6 +121,7 @@ public class OperationManagerImpl implements OperationManager { notificationStrategies = new HashMap<>(); lastUpdatedTimeStamps = new HashMap<>(); operationsInitBy = new ConcurrentHashMap<>(); + notificationExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); } public OperationManagerImpl(String deviceType, DeviceManagementService deviceManagementService) { @@ -422,50 +427,56 @@ public class OperationManagerImpl implements OperationManager { * Otherwise device may call pending operation while DB is locked for write and deadlock can occur. */ if (notificationStrategy != null) { - if (log.isDebugEnabled()) { - log.debug("Sending push notification to " + device.getDeviceIdentifier() + " from add operation method."); - } - DeviceIdentifier deviceIdentifier = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType()); - try { - notificationStrategy.execute(new NotificationContext(deviceIdentifier, operation)); - } catch (PushNotificationExecutionFailedException e) { - log.error("Error occurred while sending push notifications to " + device.getType() + - " device carrying id '" + device.getDeviceIdentifier() + "'", e); - /* - * Reschedule if push notification failed. Doing db transactions in atomic way to prevent - * deadlocks. - */ - int failAttempts = 0; - while (true) { - try { - operationMappingDAO.updateOperationMapping(operation.getId(), device.getEnrolmentInfo().getId(), - org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED); - OperationManagementDAOFactory.commitTransaction(); - break; - } catch (OperationManagementDAOException ex) { - OperationManagementDAOFactory.rollbackTransaction(); - if (++failAttempts > 3) { - String msg = "Error occurred while setting push notification status to SCHEDULED. Operation ID: " + - operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() + - ", Device ID:" + device.getDeviceIdentifier(); - log.error(msg, e); - break; - } - log.warn("Unable to set push notification status to SCHEDULED. Operation ID: " + - operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() + - ", Device ID:" + device.getDeviceIdentifier() + ", Attempt: " + failAttempts + - ", Error: " + e.getMessage()); + int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId(); + notificationExecutor.execute(() -> { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true); + if (log.isDebugEnabled()) { + log.debug("Sending push notification to " + device.getDeviceIdentifier() + " from add operation method."); + } + DeviceIdentifier deviceIdentifier = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType()); + try { + notificationStrategy.execute(new NotificationContext(deviceIdentifier, operation)); + } catch (PushNotificationExecutionFailedException e) { + log.error("Error occurred while sending push notifications to " + device.getType() + + " device carrying id '" + device.getDeviceIdentifier() + "'", e); + /* + * Reschedule if push notification failed. Doing db transactions in atomic way to prevent + * deadlocks. + */ + int failAttempts = 0; + while (true) { try { - Thread.sleep(2000); - } catch (InterruptedException ignore) { + operationMappingDAO.updateOperationMapping(operation.getId(), device.getEnrolmentInfo().getId(), + org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED); + OperationManagementDAOFactory.commitTransaction(); break; + } catch (OperationManagementDAOException ex) { + OperationManagementDAOFactory.rollbackTransaction(); + if (++failAttempts > 3) { + String msg = "Error occurred while setting push notification status to SCHEDULED. Operation ID: " + + operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() + + ", Device ID:" + device.getDeviceIdentifier(); + log.error(msg, e); + break; + } + log.warn("Unable to set push notification status to SCHEDULED. Operation ID: " + + operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() + + ", Device ID:" + device.getDeviceIdentifier() + ", Attempt: " + failAttempts + + ", Error: " + e.getMessage()); + try { + Thread.sleep(2000); + } catch (InterruptedException ignore) { + break; + } } } + } catch (Exception e) { + log.error("Error occurred while sending notifications to " + device.getType() + + " device carrying id '" + device.getDeviceIdentifier() + "'", e); } - } catch (Exception e) { - log.error("Error occurred while sending notifications to " + device.getType() + - " device carrying id '" + device.getDeviceIdentifier() + "'", e); - } + PrivilegedCarbonContext.endTenantFlow(); + }); } } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java index 28bdf863b78..c9304b746be 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java @@ -46,7 +46,7 @@ import java.util.Map; */ public class PushNotificationSchedulerTask implements Runnable { - private static Log log = LogFactory.getLog(PushNotificationSchedulerTask.class); + private static final Log log = LogFactory.getLog(PushNotificationSchedulerTask.class); private final OperationDAO operationDAO = OperationManagementDAOFactory.getOperationDAO(); private final OperationMappingDAO operationMappingDAO = OperationManagementDAOFactory.getOperationMappingDAO(); private final DeviceManagementProviderService provider = DeviceManagementDataHolder.getInstance() diff --git a/features/device-mgt/org.wso2.carbon.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mysql.sql b/features/device-mgt/org.wso2.carbon.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mysql.sql index aa4b047c0bb..90c4e8a00dd 100644 --- a/features/device-mgt/org.wso2.carbon.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mysql.sql +++ b/features/device-mgt/org.wso2.carbon.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mysql.sql @@ -140,7 +140,7 @@ CREATE TABLE IF NOT EXISTS DM_ENROLMENT_OP_MAPPING ( DEVICE_ID INTEGER DEFAULT NULL, DEVICE_TYPE VARCHAR(300) NOT NULL, DEVICE_IDENTIFICATION VARCHAR(300) DEFAULT NULL, - TENANT_ID INTEGER DEFAULT 0; + TENANT_ID INTEGER DEFAULT 0, PRIMARY KEY (ID), KEY `fk_dm_device_operation_mapping_operation` (`OPERATION_ID`), KEY `IDX_DM_ENROLMENT_OP_MAPPING` (`ENROLMENT_ID`,`OPERATION_ID`), @@ -677,9 +677,3 @@ ORDER BY TENANT_ID, DEVICE_ID; -- END OF DASHBOARD RELATED VIEWS -- --- TEMP TABLE REQUIRED FOR DATA ARCHIVAL JOB -CREATE TABLE IF NOT EXISTS DM_ARCHIVED_OPERATIONS ( - ID INTEGER NOT NULL, - CREATED_TIMESTAMP TIMESTAMP NOT NULL, - PRIMARY KEY (ID) -)ENGINE = InnoDB; diff --git a/features/device-mgt/org.wso2.carbon.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/postgresql.sql b/features/device-mgt/org.wso2.carbon.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/postgresql.sql index eeda27d3c4d..2710ffd8be8 100644 --- a/features/device-mgt/org.wso2.carbon.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/postgresql.sql +++ b/features/device-mgt/org.wso2.carbon.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/postgresql.sql @@ -667,11 +667,4 @@ WHERE DM_DEVICE.ID = DM_DEVICE_DETAIL.DEVICE_ID ORDER BY TENANT_ID, DEVICE_ID; --- END OF DASHBOARD RELATED VIEWS -- - --- TEMP TABLE REQUIRED FOR DATA ARCHIVAL JOB -CREATE TABLE IF NOT EXISTS DM_ARCHIVED_OPERATIONS ( - ID INTEGER NOT NULL, - CREATED_TIMESTAMP TIMESTAMP(0) NOT NULL, - PRIMARY KEY (ID) -); \ No newline at end of file +-- END OF DASHBOARD RELATED VIEWS -- \ No newline at end of file