Merge pull request #1259 from milanperera/master-new

Fix wso2/product-iots#1863
revert-70aa11f8
Milan Perera 6 years ago committed by GitHub
commit 39aa9c5c88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -157,8 +157,8 @@ public class OperationManagerImpl implements OperationManager {
List<DeviceIdentifier> validDeviceIds = deviceValidationResult.getValidDeviceIDList();
if (validDeviceIds.size() > 0) {
DeviceIDHolder deviceAuthorizationResult = this.authorizeDevices(operation, validDeviceIds);
List<DeviceIdentifier> authorizedDeviceList = deviceAuthorizationResult.getValidDeviceIDList();
if (authorizedDeviceList.size() <= 0) {
List<DeviceIdentifier> authorizedDeviceIds = deviceAuthorizationResult.getValidDeviceIDList();
if (authorizedDeviceIds.size() <= 0) {
log.warn("User : " + getUser() + " is not authorized to perform operations on given device-list.");
Activity activity = new Activity();
//Send the operation statuses only for admin triggered operations
@ -186,21 +186,31 @@ public class OperationManagerImpl implements OperationManager {
org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation operationDto =
OperationDAOUtil.convertOperation(operation);
int enrolmentId;
boolean hasExistingTaskOperation;
List<DeviceIdentifier> pendingDeviceList = new ArrayList<>();
String operationCode = operationDto.getCode();
for (DeviceIdentifier deviceId : authorizedDeviceList) {
List<Device> authorizedDevices = new ArrayList<>();
List<Device> ignoredDevices = new ArrayList<>();
for (DeviceIdentifier deviceId : authorizedDeviceIds) {
Device device = getDevice(deviceId);
enrolmentId = device.getEnrolmentInfo().getId();
hasExistingTaskOperation = operationDAO.updateTaskOperation(enrolmentId, operationCode);
if (hasExistingTaskOperation) {
pendingDeviceList.add(deviceId);
authorizedDevices.add(device);
}
if (operationDto.getControl() ==
org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT) {
int existingOperationID;
for (Device device : authorizedDevices) {
enrolmentId = device.getEnrolmentInfo().getId();
existingOperationID = operationDAO.getExistingOperationID(enrolmentId, operationCode);
if (existingOperationID > 0) {
ignoredDevices.add(device);
operation.setId(existingOperationID);
this.sendNotification(operation, device);
}
}
}
if (pendingDeviceList.size() > 0) {
if (authorizedDeviceList.size() == pendingDeviceList.size()) {
if (ignoredDevices.size() > 0) {
if (authorizedDevices.size() == ignoredDevices.size()) {
if (log.isDebugEnabled()) {
log.debug("All the devices contain a pending operation for the Operation Code: "
+ operationCode);
@ -212,87 +222,34 @@ public class OperationManagerImpl implements OperationManager {
deviceType));
return activity;
} else {
authorizedDeviceList.removeAll(pendingDeviceList);
authorizedDevices.removeAll(ignoredDevices);
}
}
int operationId = this.lookupOperationDAO(operation).addOperation(operationDto);
boolean isNotRepeated = false;
boolean isScheduled = false;
NotificationStrategy notificationStrategy = getNotificationStrategy();
// check whether device list is greater than batch size notification strategy has enable to send push
// notification using scheduler task
if (DeviceConfigurationManager.getInstance().getDeviceManagementConfig().
getPushNotificationConfiguration().getSchedulerBatchSize() <= authorizedDeviceList.size() &&
getPushNotificationConfiguration().getSchedulerBatchSize() <= authorizedDeviceIds.size() &&
notificationStrategy != null) {
isScheduled = notificationStrategy.getConfig().isScheduled();
}
List<Device> devices = new ArrayList<>();
if (org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT == operationDto.
getControl()) {
isNotRepeated = true;
}
//TODO have to create a sql to load device details from deviceDAO using single query.
for (DeviceIdentifier deviceId : authorizedDeviceList) {
Device device = getDevice(deviceId);
devices.add(device);
for (Device device : authorizedDevices) {
enrolmentId = device.getEnrolmentInfo().getId();
//Do not repeat the task operations
if (isScheduledOperation) {
hasExistingTaskOperation = operationDAO.updateTaskOperation(enrolmentId, operationCode);
if (!hasExistingTaskOperation) {
operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled);
}
} else if (isNotRepeated) {
operationDAO.updateEnrollmentOperationsStatus(enrolmentId, operationCode,
org.wso2.carbon.device.mgt.core.dto.operation.mgt.
Operation.Status.PENDING,
org.wso2.carbon.device.mgt.core.dto.operation.mgt.
Operation.Status.REPEATED);
operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled);
} else {
operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled);
}
operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled);
}
OperationManagementDAOFactory.commitTransaction();
/*
If notification strategy has not enable to send push notification using scheduler task we will send
notification immediately. This is done in separate loop inorder to prevent overlap with DB insert
operations with the possible db update operations trigger followed by pending operation call.
Otherwise device may call pending operation while DB is locked for write and deadlock can occur.
*/
if (notificationStrategy != null && !isScheduled) {
for (Device device : devices) {
DeviceIdentifier deviceId = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType());
if (log.isDebugEnabled()) {
log.debug("Sending push notification to " + deviceId + " from add operation method.");
}
operation.setId(operationId);
operation.setActivityId(DeviceManagementConstants.OperationAttributes.ACTIVITY + operationId);
try {
notificationStrategy.execute(new NotificationContext(deviceId, operation));
} catch (PushNotificationExecutionFailedException e) {
log.error("Error occurred while sending push notifications to " + deviceId.getType() +
" device carrying id '" + deviceId + "'", e);
/*
Reschedule if push notification failed. Doing db transactions in atomic way to prevent
deadlocks.
*/
enrolmentId = device.getEnrolmentInfo().getId();
try {
operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon
.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED);
OperationManagementDAOFactory.commitTransaction();
} catch (OperationManagementDAOException ex) {
// Not throwing this exception in order to keep sending remaining notifications if any.
log.error("Error occurred while setting push notification status to SCHEDULED.", ex);
OperationManagementDAOFactory.rollbackTransaction();
}
}
if (isScheduled) {
for (Device device : authorizedDevices) {
this.sendNotification(operation, device);
}
}
@ -415,7 +372,7 @@ public class OperationManagerImpl implements OperationManager {
}
private String addOperationMappings(List<DeviceIdentifier> authorizedDeviceList, org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation operationDto, int operationId, boolean isScheduledOperation, boolean isNotRepeated, boolean isScheduled, List<Device> devices) throws OperationManagementException, OperationManagementDAOException {
int enrolmentId;
boolean hasExistingTaskOperation;//TODO have to create a sql to load device details from deviceDAO using single query.
int existingTaskOperationId;//TODO have to create a sql to load device details from deviceDAO using single query.
String operationCode = operationDto.getCode();
for (DeviceIdentifier deviceId : authorizedDeviceList) {
Device device = getDevice(deviceId);
@ -423,8 +380,8 @@ public class OperationManagerImpl implements OperationManager {
enrolmentId = device.getEnrolmentInfo().getId();
//Do not repeat the task operations
if (isScheduledOperation) {
hasExistingTaskOperation = operationDAO.updateTaskOperation(enrolmentId, operationCode);
if (!hasExistingTaskOperation) {
existingTaskOperationId = operationDAO.getExistingOperationID(enrolmentId, operationCode);
if (existingTaskOperationId != -1) {
operationMappingDAO.addOperationMapping(operationId, enrolmentId, isScheduled);
}
} else if (isNotRepeated) {
@ -476,6 +433,41 @@ public class OperationManagerImpl implements OperationManager {
}
}
private void sendNotification(Operation operation, Device device) {
NotificationStrategy notificationStrategy = getNotificationStrategy();
/*
* If notification strategy has not enable to send push notification using scheduler task we will send
* notification immediately. This is done in separate loop inorder to prevent overlap with DB insert
* operations with the possible db update operations trigger followed by pending operation call.
* Otherwise device may call pending operation while DB is locked for write and deadlock can occur.
*/
if (notificationStrategy != null) {
if (log.isDebugEnabled()) {
log.debug("Sending push notification to " + device.getDeviceIdentifier() + " from add operation method.");
}
DeviceIdentifier deviceIdentifier = new DeviceIdentifier(device.getDeviceIdentifier(), device.getType());
try {
notificationStrategy.execute(new NotificationContext(deviceIdentifier, operation));
} catch (PushNotificationExecutionFailedException e) {
log.error("Error occurred while sending push notifications to " + device.getType() +
" device carrying id '" + device.getDeviceIdentifier() + "'", e);
/*
* Reschedule if push notification failed. Doing db transactions in atomic way to prevent
* deadlocks.
*/
try {
operationMappingDAO.updateOperationMapping(operation.getId(), device.getEnrolmentInfo().getId(), org.wso2.carbon
.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED);
OperationManagementDAOFactory.commitTransaction();
} catch (OperationManagementDAOException ex) {
// Not throwing this exception in order to keep sending remaining notifications if any.
log.error("Error occurred while setting push notification status to SCHEDULED.", ex);
OperationManagementDAOFactory.rollbackTransaction();
}
}
}
}
private List<ActivityStatus> getActivityStatus(DeviceIDHolder deviceIdValidationResult, DeviceIDHolder deviceAuthResult,
String deviceType) {
List<ActivityStatus> activityStatuses = new ArrayList<>();

@ -56,7 +56,7 @@ public interface OperationDAO {
void updateEnrollmentOperationsStatus(int enrolmentId, String operationCode, Operation.Status existingStatus,
Operation.Status newStatus) throws OperationManagementDAOException;
boolean updateTaskOperation(int enrolmentId, String operationCode) throws OperationManagementDAOException;
int getExistingOperationID(int enrolmentId, String operationCode) throws OperationManagementDAOException;
void addOperationResponse(int enrolmentId, int operationId, Object operationResponse)
throws OperationManagementDAOException;

@ -181,14 +181,14 @@ public class GenericOperationDAOImpl implements OperationDAO {
}
@Override
public boolean updateTaskOperation(int enrolmentId, String operationCode)
public int getExistingOperationID(int enrolmentId, String operationCode)
throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
boolean result = false;
int result = -1;
try {
Connection connection = OperationManagementDAOFactory.getConnection();
String query = "SELECT EOM.ID FROM DM_ENROLMENT_OP_MAPPING EOM INNER JOIN DM_OPERATION DM "
String query = "SELECT DM.ID FROM DM_ENROLMENT_OP_MAPPING EOM INNER JOIN DM_OPERATION DM "
+ "ON DM.ID = EOM.OPERATION_ID WHERE EOM.ENROLMENT_ID = ? AND DM.OPERATION_CODE = ? AND "
+ "EOM.STATUS = ?";
stmt = connection.prepareStatement(query);
@ -198,7 +198,7 @@ public class GenericOperationDAOImpl implements OperationDAO {
// This will return only one result always.
rs = stmt.executeQuery();
if (rs.next()) {
result = true;
result = rs.getInt("ID");
}
} catch (SQLException e) {
throw new OperationManagementDAOException(

Loading…
Cancel
Save