Merge pull request #1168 from charithag/master

Fixed https://github.com/wso2/product-iots/issues/1630
merge-requests/1/head
Charitha Goonetilleke 7 years ago committed by GitHub
commit 3b7fea538e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -155,6 +155,7 @@ public class OperationManagerImpl implements OperationManager {
boolean hasExistingTaskOperation;
int enrolmentId;
List<Device> devices = new ArrayList<>();
if (org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT == operationDto.
getControl()) {
isNotRepeated = true;
@ -164,6 +165,7 @@ public class OperationManagerImpl implements OperationManager {
String operationCode = operationDto.getCode();
for (DeviceIdentifier deviceId : authorizedDeviceList) {
Device device = getDevice(deviceId);
devices.add(device);
enrolmentId = device.getEnrolmentInfo().getId();
//Do not repeat the task operations
if (isScheduledOperation) {
@ -181,30 +183,46 @@ public class OperationManagerImpl implements OperationManager {
} else {
operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled);
}
/*
If notification strategy has not enable to send push notification using scheduler task
we will send notification immediately
*/
if (notificationStrategy != null && !isScheduled) {
}
OperationManagementDAOFactory.commitTransaction();
/*
If notification strategy has not enable to send push notification using scheduler task we will send
notification immediately. This is done in separate loop inorder to prevent overlap with DB insert
operations with the possible db update operations trigger followed by pending operation call.
Otherwise device may call pending operation while DB is locked for write and deadlock can occur.
*/
if (notificationStrategy != null && !isScheduled) {
for (Device device : devices) {
DeviceIdentifier deviceId = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType());
if (log.isDebugEnabled()) {
log.debug("Sending push notification to " + deviceId + " from add operation method.");
}
operation.setId(operationId);
operation.setActivityId(DeviceManagementConstants.OperationAttributes.ACTIVITY + operationId);
try {
if (log.isDebugEnabled()) {
log.debug("Sending push notification to " + deviceId + " from add operation method.");
}
operation.setId(operationId);
operation.setActivityId(DeviceManagementConstants.OperationAttributes.ACTIVITY + operationId);
notificationStrategy.execute(new NotificationContext(deviceId, operation));
operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.COMPLETED);
} catch (PushNotificationExecutionFailedException e) {
log.error("Error occurred while sending push notifications to " +
deviceId.getType() + " device carrying id '" +
deviceId + "'", e);
// Reschedule if push notification failed.
operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED);
log.error("Error occurred while sending push notifications to " + deviceId.getType() +
" device carrying id '" + deviceId + "'", e);
/*
Reschedule if push notification failed. Doing db transactions in atomic way to prevent
deadlocks.
*/
enrolmentId = device.getEnrolmentInfo().getId();
try {
operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon
.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED);
OperationManagementDAOFactory.commitTransaction();
} catch (OperationManagementDAOException ex) {
// Not throwing this exception in order to keep sending remaining notifications if any.
log.error("Error occurred while setting push notification status to SCHEDULED.", ex);
OperationManagementDAOFactory.rollbackTransaction();
}
}
}
}
OperationManagementDAOFactory.commitTransaction();
Activity activity = new Activity();
activity.setActivityId(DeviceManagementConstants.OperationAttributes.ACTIVITY + operationId);
activity.setCode(operationCode);

@ -104,8 +104,10 @@ public class OperationManagementTests extends BaseDeviceManagementTest {
@Test
public void addCommandOperation() throws DeviceManagementException, OperationManagementException,
InvalidDeviceException {
this.commandActivity = this.operationMgtService.addOperation(
InvalidDeviceException {
NotificationStrategy notificationStrategy = new TestNotificationStrategy(true);
OperationManager operationManager = new OperationManagerImpl(DEVICE_TYPE, notificationStrategy);
this.commandActivity = operationManager.addOperation(
getOperation(new CommandOperation(), Operation.Type.COMMAND, COMMAND_OPERATON_CODE),
this.deviceIds);
validateOperationResponse(this.commandActivity, ActivityStatus.Status.PENDING);

@ -26,6 +26,12 @@ import java.util.HashMap;
public class TestNotificationStrategy implements NotificationStrategy {
private PushNotificationConfig pushNotificationConfig;
private boolean setToThrowException = false;
public TestNotificationStrategy(boolean setToThrowException){
this.setToThrowException = setToThrowException;
this.pushNotificationConfig = new PushNotificationConfig("TEST", true, new HashMap<>());
}
public TestNotificationStrategy(){
this.pushNotificationConfig = new PushNotificationConfig("TEST", true, new HashMap<>());
@ -38,7 +44,9 @@ public class TestNotificationStrategy implements NotificationStrategy {
@Override
public void execute(NotificationContext ctx) throws PushNotificationExecutionFailedException {
if (setToThrowException) {
throw new PushNotificationExecutionFailedException("Generated exception");
}
}
@Override

Loading…
Cancel
Save