Rename push status to push notification status and add configurable initial delay for scheduler task

revert-70aa11f8
warunalakshitha 8 years ago
parent 1470abeb0b
commit e1e2b14b5f

@ -21,6 +21,7 @@ import org.wso2.carbon.device.mgt.common.*;
import org.wso2.carbon.device.mgt.common.push.notification.NotificationStrategy;
import java.util.List;
import java.util.Map;
/**
* This represents the Device Operation management functionality which should be implemented by

@ -90,6 +90,7 @@ public final class DeviceManagementConstants {
private PushNotifications() {
throw new AssertionError();
}
public static final int DEFAULT_SCHEDULER_TASK_INITIAL_DELAY = 60000;
public static final int DEFAULT_BATCH_DELAY_MILLS = 60000;
public static final int DEFAULT_BATCH_SIZE = 1000;
}

@ -28,35 +28,46 @@ import java.util.List;
@XmlRootElement(name = "PushNotificationConfiguration")
public class PushNotificationConfiguration {
private int SchedulerBatchSize;
private int SchedulerBatchDelayMills;
private boolean SchedulerTaskEnabled;
private int schedulerBatchSize;
private int schedulerBatchDelayMills;
private int schedulerTaskInitialDelay;
private boolean schedulerTaskEnabled;
private List<String> pushNotificationProviders;
@XmlElement(name = "SchedulerBatchSize", required = true)
public int getSchedulerBatchSize() {
return SchedulerBatchSize;
return schedulerBatchSize;
}
public void setSchedulerBatchSize(int SchedulerBatchSize) {
this.SchedulerBatchSize = SchedulerBatchSize;
public void setSchedulerBatchSize(int schedulerBatchSize) {
this.schedulerBatchSize = schedulerBatchSize;
}
@XmlElement(name = "SchedulerBatchDelayMills", required = true)
public int getSchedulerBatchDelayMills() {
return SchedulerBatchDelayMills;
return schedulerBatchDelayMills;
}
public void setSchedulerBatchDelayMills(int SchedulerBatchDelayMills) {
this.SchedulerBatchDelayMills = SchedulerBatchDelayMills;
public void setSchedulerBatchDelayMills(int schedulerBatchDelayMills) {
this.schedulerBatchDelayMills = schedulerBatchDelayMills;
}
@XmlElement(name = "SchedulerTaskInitialDelay", required = true)
public int getSchedulerTaskInitialDelay() {
return schedulerTaskInitialDelay;
}
public void setSchedulerTaskInitialDelay(int schedulerTaskInitialDelay) {
this.schedulerTaskInitialDelay = schedulerTaskInitialDelay;
}
@XmlElement(name = "SchedulerTaskEnabled", required = true)
public boolean isSchedulerTaskEnabled() {
return SchedulerTaskEnabled;
return schedulerTaskEnabled;
}
public void setSchedulerTaskEnabled(boolean schedulerTaskEnabled) {
SchedulerTaskEnabled = schedulerTaskEnabled;
this.schedulerTaskEnabled = schedulerTaskEnabled;
}
@XmlElementWrapper(name = "PushNotificationProviders", required = true)

@ -35,7 +35,7 @@ public class Operation implements Serializable {
REPEAT, NO_REPEAT, PAUSE_SEQUENCE, STOP_SEQUENCE
}
public enum PushStatus {
public enum PushNotificationStatus {
SCHEDULED, COMPLETED
}

@ -182,25 +182,37 @@ public class DeviceManagementServiceComponent {
* of Device Management Service component in order to avoid bundle start up order related complications */
notifyStartupListeners();
if (log.isDebugEnabled()) {
log.debug("Device management core bundle has been successfully initialized");
log.debug("Push notification batch enabled : " + config.getPushNotificationConfiguration()
.isSchedulerTaskEnabled());
}
// Start Push Notification Scheduler Task
if (config.getPushNotificationConfiguration().isSchedulerTaskEnabled()) {
if (config.getPushNotificationConfiguration().getSchedulerBatchSize() <= 0) {
log.error("Push notification batch size cannot be 0 or less than 0. Setting default batch size to:" +
" " + DeviceManagementConstants.PushNotifications.DEFAULT_BATCH_SIZE);
config.getPushNotificationConfiguration().setSchedulerBatchSize(DeviceManagementConstants.PushNotifications
.DEFAULT_BATCH_SIZE);
log.error("Push notification batch size cannot be 0 or less than 0. Setting default batch size " +
"to:" + DeviceManagementConstants.PushNotifications.DEFAULT_BATCH_SIZE);
config.getPushNotificationConfiguration().setSchedulerBatchSize(DeviceManagementConstants
.PushNotifications.DEFAULT_BATCH_SIZE);
}
if (config.getPushNotificationConfiguration().getSchedulerBatchDelayMills() <= 0) {
log.error("Push notification batch delay cannot be 0 or less than 0. Setting default batch delay " +
"milliseconds to" + DeviceManagementConstants.PushNotifications.DEFAULT_BATCH_DELAY_MILLS);
config.getPushNotificationConfiguration().setSchedulerBatchDelayMills(DeviceManagementConstants.PushNotifications
.DEFAULT_BATCH_DELAY_MILLS);
config.getPushNotificationConfiguration().setSchedulerBatchDelayMills(DeviceManagementConstants
.PushNotifications.DEFAULT_BATCH_DELAY_MILLS);
}
if (config.getPushNotificationConfiguration().getSchedulerTaskInitialDelay() < 0) {
log.error("Push notification initial delay cannot be less than 0. Setting default initial " +
"delay milliseconds to" + DeviceManagementConstants.PushNotifications
.DEFAULT_SCHEDULER_TASK_INITIAL_DELAY);
config.getPushNotificationConfiguration().setSchedulerTaskInitialDelay(DeviceManagementConstants
.PushNotifications.DEFAULT_SCHEDULER_TASK_INITIAL_DELAY);
}
ScheduledExecutorService pushNotificationExecutor = Executors.newSingleThreadScheduledExecutor();
pushNotificationExecutor.schedule(new PushNotificationSchedulerTask(), config.getPushNotificationConfiguration()
.getSchedulerBatchDelayMills(), TimeUnit.MILLISECONDS);
pushNotificationExecutor.scheduleWithFixedDelay(new PushNotificationSchedulerTask(), config
.getPushNotificationConfiguration().getSchedulerTaskInitialDelay(), config
.getPushNotificationConfiguration().getSchedulerBatchDelayMills(), TimeUnit.MILLISECONDS);
}
if (log.isDebugEnabled()) {
log.debug("Device management core bundle has been successfully initialized");
}
} catch (Throwable e) {
log.error("Error occurred while initializing device management core bundle", e);

@ -194,15 +194,13 @@ public class OperationManagerImpl implements OperationManager {
log.debug("Sending push notification to " + deviceId + " from add operation method.");
}
notificationStrategy.execute(new NotificationContext(deviceId, operation));
operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon
.device.mgt.core.dto.operation.mgt.Operation.PushStatus.COMPLETED);
operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.COMPLETED);
} catch (PushNotificationExecutionFailedException e) {
log.error("Error occurred while sending push notifications to " +
deviceId.getType() + " device carrying id '" +
deviceId + "'", e);
// Reschedule if push notification failed.
operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon
.device.mgt.core.dto.operation.mgt.Operation.PushStatus.SCHEDULED);
operationMappingDAO.updateOperationMapping(operationId, enrolmentId, org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.PushNotificationStatus.SCHEDULED);
}
}
}

@ -29,7 +29,7 @@ public class OperationMapping {
private int operationId;
private int tenantId;
private Operation.Status status;
private Operation.PushStatus pushStatus;
private Operation.PushNotificationStatus pushNotificationStatus;
public int getOperationId() {
return operationId;
@ -63,11 +63,11 @@ public class OperationMapping {
this.status = status;
}
public Operation.PushStatus getPushStatus() {
return pushStatus;
public Operation.PushNotificationStatus getPushNotificationStatus() {
return pushNotificationStatus;
}
public void setPushStatus(Operation.PushStatus pushStatus) {
this.pushStatus = pushStatus;
public void setPushNotificationStatus(Operation.PushNotificationStatus pushNotificationStatus) {
this.pushNotificationStatus = pushNotificationStatus;
}
}

@ -85,12 +85,12 @@ public interface OperationDAO {
/**
* This method provides operation mappings for given status
* @param opStatus Operation status
* @param pushStatus Push notification Status
* @param pushNotificationStatus Push notification Status
* @param limit Limit for no devices
* @return Tenant based operation mappings list
* @throws OperationManagementDAOException
*/
Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushStatus pushStatus,
int limit) throws OperationManagementDAOException;;
Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus,
int limit) throws OperationManagementDAOException;
}

@ -29,7 +29,7 @@ public interface OperationMappingDAO {
void removeOperationMapping(int operationId, Integer deviceId) throws OperationManagementDAOException;
void updateOperationMapping(int operationId, Integer deviceId, Operation.PushStatus pushStatus) throws
void updateOperationMapping(int operationId, Integer deviceId, Operation.PushNotificationStatus pushNotificationStatus) throws
OperationManagementDAOException;
void updateOperationMapping(List<OperationMapping> operationMappingList) throws
OperationManagementDAOException;

@ -1095,21 +1095,22 @@ public class GenericOperationDAOImpl implements OperationDAO {
}
@Override
public Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushStatus pushStatus,
public Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus,
int limit) throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
Connection conn;
OperationMapping operationMapping;
Map<Integer, List<OperationMapping>> operationMappingsTenantMap = new HashMap<>();
try {
Connection conn = OperationManagementDAOFactory.getConnection();
conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT op.ENROLMENT_ID, op.OPERATION_ID, dt.NAME ,d.TENANT_ID FROM DM_DEVICE d, " +
"DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = '?' AND " +
"op.PUSH_NOTIFICATION_STATUS = '?' AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID ORDER BY " +
"DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = ? AND " +
"op.PUSH_NOTIFICATION_STATUS = ? AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID ORDER BY " +
"op.OPERATION_ID LIMIT ?";
stmt = conn.prepareStatement(sql);
stmt.setString(1, opStatus.toString());
stmt.setString(2, pushStatus.toString());
stmt.setString(2, pushNotificationStatus.toString());
stmt.setInt(3, limit);
rs = stmt.executeQuery();
while (rs.next()) {

@ -46,9 +46,9 @@ public class OperationMappingDAOImpl implements OperationMappingDAO {
stmt.setInt(2, operationId);
stmt.setString(3, Operation.Status.PENDING.toString());
if (isScheduled) {
stmt.setString(4, Operation.PushStatus.SCHEDULED.toString());
stmt.setString(4, Operation.PushNotificationStatus.SCHEDULED.toString());
} else {
stmt.setString(4, Operation.PushStatus.COMPLETED.toString());
stmt.setString(4, Operation.PushNotificationStatus.COMPLETED.toString());
}
stmt.setLong(5, time);
stmt.setLong(6, time);
@ -79,14 +79,14 @@ public class OperationMappingDAOImpl implements OperationMappingDAO {
}
@Override
public void updateOperationMapping(int operationId, Integer deviceId, Operation.PushStatus pushStatus) throws OperationManagementDAOException {
public void updateOperationMapping(int operationId, Integer deviceId, Operation.PushNotificationStatus pushNotificationStatus) throws OperationManagementDAOException {
PreparedStatement stmt = null;
try {
Connection conn = OperationManagementDAOFactory.getConnection();
String sql = "UPDATE DM_ENROLMENT_OP_MAPPING SET PUSH_NOTIFICATION_STATUS = ? WHERE ENROLMENT_ID = ? and " +
"OPERATION_ID = ?";
stmt = conn.prepareStatement(sql);
stmt.setString(1, pushStatus.toString());
stmt.setString(1, pushNotificationStatus.toString());
stmt.setInt(2, deviceId);
stmt.setInt(3, operationId);
stmt.executeUpdate();
@ -108,7 +108,7 @@ public class OperationMappingDAOImpl implements OperationMappingDAO {
stmt = conn.prepareStatement(sql);
if (conn.getMetaData().supportsBatchUpdates()) {
for (OperationMapping operationMapping : operationMappingList) {
stmt.setString(1, operationMapping.getPushStatus().toString());
stmt.setString(1, operationMapping.getPushNotificationStatus().toString());
stmt.setInt(2, Integer.parseInt(operationMapping.getDeviceIdentifier().getId()));
stmt.setInt(3, operationMapping.getOperationId());
stmt.addBatch();
@ -116,7 +116,7 @@ public class OperationMappingDAOImpl implements OperationMappingDAO {
stmt.executeBatch();
} else {
for (OperationMapping operationMapping : operationMappingList) {
stmt.setString(1, operationMapping.getPushStatus().toString());
stmt.setString(1, operationMapping.getPushNotificationStatus().toString());
stmt.setInt(2, Integer.parseInt(operationMapping.getDeviceIdentifier().getId()));
stmt.setInt(3, operationMapping.getOperationId());
stmt.executeUpdate();

@ -368,7 +368,7 @@ public class OracleOperationDAOImpl extends GenericOperationDAOImpl {
@Override
public Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushStatus pushStatus,
public Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus,
int limit) throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
@ -377,13 +377,13 @@ public class OracleOperationDAOImpl extends GenericOperationDAOImpl {
try {
Connection conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT op.ENROLMENT_ID, op.OPERATION_ID, dt.NAME ,d.TENANT_ID FROM DM_DEVICE d, " +
"DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = '?' AND op" +
".PUSH_NOTIFICATION_STATUS = '?' AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID AND ROWNUM" +
"DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = ? AND op" +
".PUSH_NOTIFICATION_STATUS = ? AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID AND ROWNUM" +
" <= ? ORDER BY op.OPERATION_ID";
stmt = conn.prepareStatement(sql);
stmt.setString(1, opStatus.toString());
stmt.setString(2, pushStatus.toString());
stmt.setString(2, pushNotificationStatus.toString());
stmt.setInt(3, limit);
rs = stmt.executeQuery();
while (rs.next()) {

@ -269,7 +269,7 @@ public class SQLServerOperationDAOImpl extends GenericOperationDAOImpl {
}
@Override
public Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushStatus pushStatus,
public Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus,
int limit) throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
@ -278,12 +278,12 @@ public class SQLServerOperationDAOImpl extends GenericOperationDAOImpl {
try {
Connection conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT op.ENROLMENT_ID, op.OPERATION_ID, dt.NAME ,d.TENANT_ID FROM DM_DEVICE d, " +
"DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = '?' AND op" +
".PUSH_NOTIFICATION_STATUS = '?' AND d.DEVICE_TYPE_ID = dt.ID " +
"DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = ? AND op" +
".PUSH_NOTIFICATION_STATUS = ? AND d.DEVICE_TYPE_ID = dt.ID " +
"AND d.ID=op.ENROLMENT_ID ORDER BY op.OPERATION_ID OFFSET ? ROWS FETCH NEXT ? ROWS ONLY";
stmt = conn.prepareStatement(sql);
stmt.setString(1, opStatus.toString());
stmt.setString(2, pushStatus.toString());
stmt.setString(2, pushNotificationStatus.toString());
stmt.setInt(3, limit);
rs = stmt.executeQuery();
while (rs.next()) {

@ -27,7 +27,6 @@ import org.wso2.carbon.device.mgt.common.push.notification.NotificationContext;
import org.wso2.carbon.device.mgt.common.push.notification.NotificationStrategy;
import org.wso2.carbon.device.mgt.common.push.notification.PushNotificationExecutionFailedException;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import org.wso2.carbon.device.mgt.core.dao.DeviceManagementDAOFactory;
import org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation;
import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder;
import org.wso2.carbon.device.mgt.core.operation.mgt.OperationMapping;
@ -37,7 +36,6 @@ import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOF
import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationMappingDAO;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -56,6 +54,7 @@ public class PushNotificationSchedulerTask implements Runnable {
@Override
public void run() {
try {
Map<Integer, List<OperationMapping>> operationMappingsTenantMap = new HashMap<>();
List<OperationMapping> operationsCompletedList = new LinkedList<>();
if (log.isDebugEnabled()) {
@ -63,16 +62,14 @@ public class PushNotificationSchedulerTask implements Runnable {
}
try {
//Get next available operation list per device batch
DeviceManagementDAOFactory.openConnection();
OperationManagementDAOFactory.openConnection();
operationMappingsTenantMap = operationDAO.getOperationMappingsByStatus(Operation.Status
.PENDING, Operation.PushStatus.SCHEDULED, DeviceConfigurationManager.getInstance()
.PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance()
.getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize());
} catch (SQLException e) {
log.error("Error occurred while opening a connection to the data source", e);
} catch (OperationManagementDAOException e) {
log.error("Unable to retrieve scheduled pending operations for task.", e);
} finally {
DeviceManagementDAOFactory.closeConnection();
OperationManagementDAOFactory.closeConnection();
}
// Sending push notification to each device
for (List<OperationMapping> operationMappings : operationMappingsTenantMap.values()) {
@ -92,7 +89,7 @@ public class PushNotificationSchedulerTask implements Runnable {
notificationStrategy.execute(new NotificationContext(operationMapping.getDeviceIdentifier(),
provider.getOperation(operationMapping.getDeviceIdentifier().getType(), operationMapping
.getOperationId())));
operationMapping.setPushStatus(Operation.PushStatus.COMPLETED);
operationMapping.setPushNotificationStatus(Operation.PushNotificationStatus.COMPLETED);
operationsCompletedList.add(operationMapping);
} catch (DeviceManagementException e) {
log.error("Error occurred while getting notification strategy for operation mapping " +
@ -123,5 +120,8 @@ public class PushNotificationSchedulerTask implements Runnable {
if (log.isDebugEnabled()) {
log.debug("Push notification job running completed.");
}
} catch (Throwable cause) {
log.error("PushNotificationSchedulerTask failed due to " + cause);
}
}
}

Loading…
Cancel
Save