|
|
@ -77,6 +77,8 @@ import java.util.List;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* This class implements all the functionality exposed as part of the OperationManager. Any transaction initiated
|
|
|
|
* This class implements all the functionality exposed as part of the OperationManager. Any transaction initiated
|
|
|
@ -105,6 +107,8 @@ public class OperationManagerImpl implements OperationManager {
|
|
|
|
private final Map<Integer, Long> lastUpdatedTimeStamps;
|
|
|
|
private final Map<Integer, Long> lastUpdatedTimeStamps;
|
|
|
|
private final ConcurrentMap<Integer, String> operationsInitBy;
|
|
|
|
private final ConcurrentMap<Integer, String> operationsInitBy;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final ThreadPoolExecutor notificationExecutor;
|
|
|
|
|
|
|
|
|
|
|
|
public OperationManagerImpl() {
|
|
|
|
public OperationManagerImpl() {
|
|
|
|
commandOperationDAO = OperationManagementDAOFactory.getCommandOperationDAO();
|
|
|
|
commandOperationDAO = OperationManagementDAOFactory.getCommandOperationDAO();
|
|
|
|
configOperationDAO = OperationManagementDAOFactory.getConfigOperationDAO();
|
|
|
|
configOperationDAO = OperationManagementDAOFactory.getConfigOperationDAO();
|
|
|
@ -117,6 +121,7 @@ public class OperationManagerImpl implements OperationManager {
|
|
|
|
notificationStrategies = new HashMap<>();
|
|
|
|
notificationStrategies = new HashMap<>();
|
|
|
|
lastUpdatedTimeStamps = new HashMap<>();
|
|
|
|
lastUpdatedTimeStamps = new HashMap<>();
|
|
|
|
operationsInitBy = new ConcurrentHashMap<>();
|
|
|
|
operationsInitBy = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
notificationExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public OperationManagerImpl(String deviceType, DeviceManagementService deviceManagementService) {
|
|
|
|
public OperationManagerImpl(String deviceType, DeviceManagementService deviceManagementService) {
|
|
|
@ -422,50 +427,56 @@ public class OperationManagerImpl implements OperationManager {
|
|
|
|
* Otherwise device may call pending operation while DB is locked for write and deadlock can occur.
|
|
|
|
* Otherwise device may call pending operation while DB is locked for write and deadlock can occur.
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
if (notificationStrategy != null) {
|
|
|
|
if (notificationStrategy != null) {
|
|
|
|
if (log.isDebugEnabled()) {
|
|
|
|
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
|
|
|
|
log.debug("Sending push notification to " + device.getDeviceIdentifier() + " from add operation method.");
|
|
|
|
notificationExecutor.execute(() -> {
|
|
|
|
}
|
|
|
|
PrivilegedCarbonContext.startTenantFlow();
|
|
|
|
DeviceIdentifier deviceIdentifier = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType());
|
|
|
|
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true);
|
|
|
|
try {
|
|
|
|
if (log.isDebugEnabled()) {
|
|
|
|
notificationStrategy.execute(new NotificationContext(deviceIdentifier, operation));
|
|
|
|
log.debug("Sending push notification to " + device.getDeviceIdentifier() + " from add operation method.");
|
|
|
|
} catch (PushNotificationExecutionFailedException e) {
|
|
|
|
}
|
|
|
|
log.error("Error occurred while sending push notifications to " + device.getType() +
|
|
|
|
DeviceIdentifier deviceIdentifier = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType());
|
|
|
|
" device carrying id '" + device.getDeviceIdentifier() + "'", e);
|
|
|
|
try {
|
|
|
|
/*
|
|
|
|
notificationStrategy.execute(new NotificationContext(deviceIdentifier, operation));
|
|
|
|
* Reschedule if push notification failed. Doing db transactions in atomic way to prevent
|
|
|
|
} catch (PushNotificationExecutionFailedException e) {
|
|
|
|
* deadlocks.
|
|
|
|
log.error("Error occurred while sending push notifications to " + device.getType() +
|
|
|
|
*/
|
|
|
|
" device carrying id '" + device.getDeviceIdentifier() + "'", e);
|
|
|
|
int failAttempts = 0;
|
|
|
|
/*
|
|
|
|
while (true) {
|
|
|
|
* Reschedule if push notification failed. Doing db transactions in atomic way to prevent
|
|
|
|
try {
|
|
|
|
* deadlocks.
|
|
|
|
operationMappingDAO.updateOperationMapping(operation.getId(), device.getEnrolmentInfo().getId(),
|
|
|
|
*/
|
|
|
|
org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED);
|
|
|
|
int failAttempts = 0;
|
|
|
|
OperationManagementDAOFactory.commitTransaction();
|
|
|
|
while (true) {
|
|
|
|
break;
|
|
|
|
|
|
|
|
} catch (OperationManagementDAOException ex) {
|
|
|
|
|
|
|
|
OperationManagementDAOFactory.rollbackTransaction();
|
|
|
|
|
|
|
|
if (++failAttempts > 3) {
|
|
|
|
|
|
|
|
String msg = "Error occurred while setting push notification status to SCHEDULED. Operation ID: " +
|
|
|
|
|
|
|
|
operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() +
|
|
|
|
|
|
|
|
", Device ID:" + device.getDeviceIdentifier();
|
|
|
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
log.warn("Unable to set push notification status to SCHEDULED. Operation ID: " +
|
|
|
|
|
|
|
|
operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() +
|
|
|
|
|
|
|
|
", Device ID:" + device.getDeviceIdentifier() + ", Attempt: " + failAttempts +
|
|
|
|
|
|
|
|
", Error: " + e.getMessage());
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
Thread.sleep(2000);
|
|
|
|
operationMappingDAO.updateOperationMapping(operation.getId(), device.getEnrolmentInfo().getId(),
|
|
|
|
} catch (InterruptedException ignore) {
|
|
|
|
org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED);
|
|
|
|
|
|
|
|
OperationManagementDAOFactory.commitTransaction();
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
|
|
|
|
} catch (OperationManagementDAOException ex) {
|
|
|
|
|
|
|
|
OperationManagementDAOFactory.rollbackTransaction();
|
|
|
|
|
|
|
|
if (++failAttempts > 3) {
|
|
|
|
|
|
|
|
String msg = "Error occurred while setting push notification status to SCHEDULED. Operation ID: " +
|
|
|
|
|
|
|
|
operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() +
|
|
|
|
|
|
|
|
", Device ID:" + device.getDeviceIdentifier();
|
|
|
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
log.warn("Unable to set push notification status to SCHEDULED. Operation ID: " +
|
|
|
|
|
|
|
|
operation.getId() + ", Enrollment ID: " + device.getEnrolmentInfo().getId() +
|
|
|
|
|
|
|
|
", Device ID:" + device.getDeviceIdentifier() + ", Attempt: " + failAttempts +
|
|
|
|
|
|
|
|
", Error: " + e.getMessage());
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
Thread.sleep(2000);
|
|
|
|
|
|
|
|
} catch (InterruptedException ignore) {
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
|
|
log.error("Error occurred while sending notifications to " + device.getType() +
|
|
|
|
|
|
|
|
" device carrying id '" + device.getDeviceIdentifier() + "'", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (Exception e) {
|
|
|
|
PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
log.error("Error occurred while sending notifications to " + device.getType() +
|
|
|
|
});
|
|
|
|
" device carrying id '" + device.getDeviceIdentifier() + "'", e);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|