Fix concurrency issue in push notification execution

(cherry picked from commit c5b7612b135ad685e1532f4993d20d7e8097e8e2)
reporting
Charitha Goonetilleke 5 years ago
parent 9a6fae5696
commit 4ff9b326b7

@ -77,6 +77,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* This class implements all the functionality exposed as part of the OperationManager. Any transaction initiated * This class implements all the functionality exposed as part of the OperationManager. Any transaction initiated
@ -105,6 +107,8 @@ public class OperationManagerImpl implements OperationManager {
private final Map<Integer, Long> lastUpdatedTimeStamps; private final Map<Integer, Long> lastUpdatedTimeStamps;
private final ConcurrentMap<Integer, String> operationsInitBy; private final ConcurrentMap<Integer, String> operationsInitBy;
private final ThreadPoolExecutor notificationExecutor;
public OperationManagerImpl() { public OperationManagerImpl() {
commandOperationDAO = OperationManagementDAOFactory.getCommandOperationDAO(); commandOperationDAO = OperationManagementDAOFactory.getCommandOperationDAO();
configOperationDAO = OperationManagementDAOFactory.getConfigOperationDAO(); configOperationDAO = OperationManagementDAOFactory.getConfigOperationDAO();
@ -117,6 +121,7 @@ public class OperationManagerImpl implements OperationManager {
notificationStrategies = new HashMap<>(); notificationStrategies = new HashMap<>();
lastUpdatedTimeStamps = new HashMap<>(); lastUpdatedTimeStamps = new HashMap<>();
operationsInitBy = new ConcurrentHashMap<>(); operationsInitBy = new ConcurrentHashMap<>();
notificationExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
} }
public OperationManagerImpl(String deviceType, DeviceManagementService deviceManagementService) { public OperationManagerImpl(String deviceType, DeviceManagementService deviceManagementService) {
@ -422,50 +427,56 @@ public class OperationManagerImpl implements OperationManager {
* Otherwise device may call pending operation while DB is locked for write and deadlock can occur. * Otherwise device may call pending operation while DB is locked for write and deadlock can occur.
*/ */
if (notificationStrategy != null) { if (notificationStrategy != null) {
if (log.isDebugEnabled()) { int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
log.debug("Sending push notification to " + device.getDeviceIdentifier() + " from add operation method."); notificationExecutor.execute(() -> {
} PrivilegedCarbonContext.startTenantFlow();
DeviceIdentifier deviceIdentifier = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType()); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true);
try { if (log.isDebugEnabled()) {
notificationStrategy.execute(new NotificationContext(deviceIdentifier, operation)); log.debug("Sending push notification to " + device.getDeviceIdentifier() + " from add operation method.");
} catch (PushNotificationExecutionFailedException e) { }
log.error("Error occurred while sending push notifications to " + device.getType() + DeviceIdentifier deviceIdentifier = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType());
" device carrying id '" + device.getDeviceIdentifier() + "'", e); try {
/* notificationStrategy.execute(new NotificationContext(deviceIdentifier, operation));
* Reschedule if push notification failed. Doing db transactions in atomic way to prevent } catch (PushNotificationExecutionFailedException e) {
* deadlocks. log.error("Error occurred while sending push notifications to " + device.getType() +
*/ " device carrying id '" + device.getDeviceIdentifier() + "'", e);
int failAttempts = 0; /*
while (true) { * Reschedule if push notification failed. Doing db transactions in atomic way to prevent
try { * deadlocks.
operationMappingDAO.updateOperationMapping(operation.getId(), device.getEnrolmentInfo().getId(), */
org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED); int failAttempts = 0;
OperationManagementDAOFactory.commitTransaction(); while (true) {
break;
} catch (OperationManagementDAOException ex) {
OperationManagementDAOFactory.rollbackTransaction();
if (++failAttempts > 3) {
String msg = "Error occurred while setting push notification status to SCHEDULED. Operation ID: " +
operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() +
", Device ID:" + device.getDeviceIdentifier();
log.error(msg, e);
break;
}
log.warn("Unable to set push notification status to SCHEDULED. Operation ID: " +
operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() +
", Device ID:" + device.getDeviceIdentifier() + ", Attempt: " + failAttempts +
", Error: " + e.getMessage());
try { try {
Thread.sleep(2000); operationMappingDAO.updateOperationMapping(operation.getId(), device.getEnrolmentInfo().getId(),
} catch (InterruptedException ignore) { org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED);
OperationManagementDAOFactory.commitTransaction();
break; break;
} catch (OperationManagementDAOException ex) {
OperationManagementDAOFactory.rollbackTransaction();
if (++failAttempts > 3) {
String msg = "Error occurred while setting push notification status to SCHEDULED. Operation ID: " +
operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() +
", Device ID:" + device.getDeviceIdentifier();
log.error(msg, e);
break;
}
log.warn("Unable to set push notification status to SCHEDULED. Operation ID: " +
operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() +
", Device ID:" + device.getDeviceIdentifier() + ", Attempt: " + failAttempts +
", Error: " + e.getMessage());
try {
Thread.sleep(2000);
} catch (InterruptedException ignore) {
break;
}
} }
} }
} catch (Exception e) {
log.error("Error occurred while sending notifications to " + device.getType() +
" device carrying id '" + device.getDeviceIdentifier() + "'", e);
} }
} catch (Exception e) { PrivilegedCarbonContext.endTenantFlow();
log.error("Error occurred while sending notifications to " + device.getType() + });
" device carrying id '" + device.getDeviceIdentifier() + "'", e);
}
} }
} }

@ -46,7 +46,7 @@ import java.util.Map;
*/ */
public class PushNotificationSchedulerTask implements Runnable { public class PushNotificationSchedulerTask implements Runnable {
private static Log log = LogFactory.getLog(PushNotificationSchedulerTask.class); private static final Log log = LogFactory.getLog(PushNotificationSchedulerTask.class);
private final OperationDAO operationDAO = OperationManagementDAOFactory.getOperationDAO(); private final OperationDAO operationDAO = OperationManagementDAOFactory.getOperationDAO();
private final OperationMappingDAO operationMappingDAO = OperationManagementDAOFactory.getOperationMappingDAO(); private final OperationMappingDAO operationMappingDAO = OperationManagementDAOFactory.getOperationMappingDAO();
private final DeviceManagementProviderService provider = DeviceManagementDataHolder.getInstance() private final DeviceManagementProviderService provider = DeviceManagementDataHolder.getInstance()

@ -140,7 +140,7 @@ CREATE TABLE IF NOT EXISTS DM_ENROLMENT_OP_MAPPING (
DEVICE_ID INTEGER DEFAULT NULL, DEVICE_ID INTEGER DEFAULT NULL,
DEVICE_TYPE VARCHAR(300) NOT NULL, DEVICE_TYPE VARCHAR(300) NOT NULL,
DEVICE_IDENTIFICATION VARCHAR(300) DEFAULT NULL, DEVICE_IDENTIFICATION VARCHAR(300) DEFAULT NULL,
TENANT_ID INTEGER DEFAULT 0; TENANT_ID INTEGER DEFAULT 0,
PRIMARY KEY (ID), PRIMARY KEY (ID),
KEY `fk_dm_device_operation_mapping_operation` (`OPERATION_ID`), KEY `fk_dm_device_operation_mapping_operation` (`OPERATION_ID`),
KEY `IDX_DM_ENROLMENT_OP_MAPPING` (`ENROLMENT_ID`,`OPERATION_ID`), KEY `IDX_DM_ENROLMENT_OP_MAPPING` (`ENROLMENT_ID`,`OPERATION_ID`),
@ -677,9 +677,3 @@ ORDER BY TENANT_ID, DEVICE_ID;
-- END OF DASHBOARD RELATED VIEWS -- -- END OF DASHBOARD RELATED VIEWS --
-- TEMP TABLE REQUIRED FOR DATA ARCHIVAL JOB
CREATE TABLE IF NOT EXISTS DM_ARCHIVED_OPERATIONS (
ID INTEGER NOT NULL,
CREATED_TIMESTAMP TIMESTAMP NOT NULL,
PRIMARY KEY (ID)
)ENGINE = InnoDB;

@ -667,11 +667,4 @@ WHERE
DM_DEVICE.ID = DM_DEVICE_DETAIL.DEVICE_ID DM_DEVICE.ID = DM_DEVICE_DETAIL.DEVICE_ID
ORDER BY TENANT_ID, DEVICE_ID; ORDER BY TENANT_ID, DEVICE_ID;
-- END OF DASHBOARD RELATED VIEWS -- -- END OF DASHBOARD RELATED VIEWS --
-- TEMP TABLE REQUIRED FOR DATA ARCHIVAL JOB
CREATE TABLE IF NOT EXISTS DM_ARCHIVED_OPERATIONS (
ID INTEGER NOT NULL,
CREATED_TIMESTAMP TIMESTAMP(0) NOT NULL,
PRIMARY KEY (ID)
);
Loading…
Cancel
Save