Merge pull request 'improvements to heart beat, push notification task and operation timeout task' (#217) from amalka.subasinghe/device-mgt-core:master into master

Reviewed-on: community/device-mgt-core#217
Certificate-identifire
commit 6d3127a008

@ -105,6 +105,10 @@ public interface OperationDAO {
Map<Integer, List<OperationMapping>> getOperationMappingsByStatus(Operation.Status opStatus, Operation.PushNotificationStatus pushNotificationStatus,
int limit) throws OperationManagementDAOException;
Map<Integer, List<OperationMapping>> getAllocatedOperationMappingsByStatus(Operation.Status opStatus,
Operation.PushNotificationStatus pushNotificationStatus, int limit, int activeServerCount, int serverIndex)
throws OperationManagementDAOException;
List<Activity> getActivities(List<String> deviceTypes, String operationCode, long updatedSince, String operationStatus)
throws OperationManagementDAOException;

@ -2010,6 +2010,54 @@ public class GenericOperationDAOImpl implements OperationDAO {
return operationMappingsTenantMap;
}
@Override
public Map<Integer, List<OperationMapping>> getAllocatedOperationMappingsByStatus(Operation.Status opStatus,
Operation.PushNotificationStatus pushNotificationStatus, int limit, int activeServerCount, int serverIndex)
throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
Connection conn;
OperationMapping operationMapping;
Map<Integer, List<OperationMapping>> operationMappingsTenantMap = new HashMap<>();
try {
conn = OperationManagementDAOFactory.getConnection();
String sql = "SELECT op.ENROLMENT_ID, op.OPERATION_ID, d.DEVICE_IDENTIFICATION, dt.NAME as DEVICE_TYPE, " +
"d.TENANT_ID FROM DM_DEVICE d, DM_ENROLMENT_OP_MAPPING op, DM_DEVICE_TYPE dt WHERE op.STATUS = ?" +
" AND op.PUSH_NOTIFICATION_STATUS = ? AND d.DEVICE_TYPE_ID = dt.ID AND d.ID=op.ENROLMENT_ID AND MOD(d.ID, ?) = ? ORDER" +
" BY op.OPERATION_ID LIMIT ?";
stmt = conn.prepareStatement(sql);
stmt.setString(1, opStatus.toString());
stmt.setString(2, pushNotificationStatus.toString());
stmt.setInt(3, activeServerCount);
stmt.setInt(4, serverIndex);
stmt.setInt(5, limit);
rs = stmt.executeQuery();
while (rs.next()) {
int tenantID = rs.getInt("TENANT_ID");
List<OperationMapping> operationMappings = operationMappingsTenantMap.get(tenantID);
if (operationMappings == null) {
operationMappings = new LinkedList<>();
operationMappingsTenantMap.put(tenantID, operationMappings);
}
operationMapping = new OperationMapping();
operationMapping.setOperationId(rs.getInt("OPERATION_ID"));
DeviceIdentifier deviceIdentifier = new DeviceIdentifier();
deviceIdentifier.setId(rs.getString("DEVICE_IDENTIFICATION"));
deviceIdentifier.setType(rs.getString("DEVICE_TYPE"));
operationMapping.setDeviceIdentifier(deviceIdentifier);
operationMapping.setEnrollmentId(rs.getInt("ENROLMENT_ID"));
operationMapping.setTenantId(tenantID);
operationMappings.add(operationMapping);
}
} catch (SQLException e) {
throw new OperationManagementDAOException("SQL error while getting operation mappings from database. " +
e.getMessage(), e);
} finally {
OperationManagementDAOUtil.cleanupResources(stmt, rs);
}
return operationMappingsTenantMap;
}
public List<Activity> getActivities(List<String> deviceTypes, String operationCode, long updatedSince, String operationStatus)
throws OperationManagementDAOException {

@ -18,6 +18,7 @@
package io.entgra.device.mgt.core.device.mgt.core.operation.timeout.task.impl;
import com.google.gson.Gson;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException;
@ -36,7 +37,6 @@ import java.util.List;
public class OperationTimeoutTask extends DynamicPartitionedScheduleTask {
private static final Log log = LogFactory.getLog(OperationTimeoutTask.class);
@Override
protected void setup() {
@ -44,45 +44,60 @@ public class OperationTimeoutTask extends DynamicPartitionedScheduleTask {
@Override
protected void executeDynamicTask() {
String operationTimeoutTaskConfigStr = getProperty(
OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
Gson gson = new Gson();
OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class);
try {
long timeMillis = System.currentTimeMillis() - operationTimeoutConfig.getTimeout() * 60 * 1000;
List<String> deviceTypes = new ArrayList<>();
if (operationTimeoutConfig.getDeviceTypes().size() == 1 &&
"ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) {
try {
List<DeviceType> deviceTypeList = DeviceManagementDataHolder.getInstance()
.getDeviceManagementProvider().getDeviceTypes();
for (DeviceType deviceType : deviceTypeList) {
deviceTypes.add(deviceType.getName());
if (isQualifiedToExecuteTask()) { // this task will run only in one node when the deployment has multiple nodes
String operationTimeoutTaskConfigStr = getProperty(
OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
Gson gson = new Gson();
OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class);
try {
long timeMillis = System.currentTimeMillis() - (long) operationTimeoutConfig.getTimeout();
List<String> deviceTypes = new ArrayList<>();
if (operationTimeoutConfig.getDeviceTypes().size() == 1 &&
"ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) {
try {
List<DeviceType> deviceTypeList = DeviceManagementDataHolder.getInstance()
.getDeviceManagementProvider().getDeviceTypes();
for (DeviceType deviceType : deviceTypeList) {
deviceTypes.add(deviceType.getName());
}
} catch (DeviceManagementException e) {
log.error("Error occurred while reading device types", e);
}
} catch (DeviceManagementException e) {
log.error("Error occurred while reading device types", e);
} else {
deviceTypes = operationTimeoutConfig.getDeviceTypes();
}
} else {
deviceTypes = operationTimeoutConfig.getDeviceTypes();
}
List<Activity> activities = DeviceManagementDataHolder.getInstance().getOperationManager()
.getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis,
operationTimeoutConfig.getInitialStatus());
for (Activity activity : activities) {
for (ActivityStatus activityStatus : activity.getActivityStatus()) {
String operationId = activity.getActivityId().replace("ACTIVITY_", "");
Operation operation = DeviceManagementDataHolder.getInstance().getOperationManager()
.getOperation(Integer.parseInt(operationId));
operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus()));
DeviceManagementDataHolder.getInstance().getOperationManager()
.updateOperation(activityStatus.getDeviceIdentifier(), operation);
List<Activity> activities = DeviceManagementDataHolder.getInstance().getOperationManager()
.getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis,
operationTimeoutConfig.getInitialStatus());
for (Activity activity : activities) {
for (ActivityStatus activityStatus : activity.getActivityStatus()) {
String operationId = activity.getActivityId().replace("ACTIVITY_", "");
Operation operation = DeviceManagementDataHolder.getInstance().getOperationManager()
.getOperation(Integer.parseInt(operationId));
operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus()));
DeviceManagementDataHolder.getInstance().getOperationManager()
.updateOperation(activityStatus.getDeviceIdentifier(), operation);
}
}
}
} catch (OperationManagementException e) {
String msg = "Error occurred while retrieving operations.";
log.error(msg, e);
} catch (OperationManagementException e) {
String msg = "Error occurred while retrieving operations.";
log.error(msg, e);
}
}
}
private boolean isQualifiedToExecuteTask() {
if (isDynamicTaskEligible()) {
try {
return DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask();
} catch (HeartBeatManagementException e) {
log.error("Error while checking is qualified to execute task", e);
}
} else {
return true;
}
return false;
}
}

@ -57,14 +57,14 @@ public class OperationTimeoutTaskManagerServiceImpl implements OperationTimeoutT
log.debug("Operation timeout task is started for the device type(s) : " + config.getDeviceTypes()
+ ", operation code : " + config.getInitialStatus());
log.debug(
"Operation timeout task is at frequency of : " + config.getTimeout() + " minutes");
"Operation timeout task is at frequency of : " + config.getTimeout() + " milliseconds");
}
TaskManager taskManager = taskService.getTaskManager(OPERATION_TIMEOUT_TASK);
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
//Convert to milli seconds
triggerInfo.setIntervalMillis(config.getTimeout() * 60 * 1000);
triggerInfo.setIntervalMillis(config.getTimeout());
triggerInfo.setRepeatCount(-1);
Gson gson = new Gson();
@ -125,7 +125,7 @@ public class OperationTimeoutTaskManagerServiceImpl implements OperationTimeoutT
if (taskManager.isTaskScheduled(taskName)) {
taskManager.deleteTask(taskName);
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
triggerInfo.setIntervalMillis(config.getTimeout() * 60 * 1000);
triggerInfo.setIntervalMillis(config.getTimeout());
triggerInfo.setRepeatCount(-1);
Map<String, String> properties = new HashMap<>();

@ -17,6 +17,9 @@
*/
package io.entgra.device.mgt.core.device.mgt.core.push.notification.mgt.task;
import io.entgra.device.mgt.core.device.mgt.common.ServerCtxInfo;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
@ -63,9 +66,27 @@ public class PushNotificationSchedulerTask implements Runnable {
try {
//Get next available operation list per device batch
OperationManagementDAOFactory.openConnection();
operationMappingsTenantMap = operationDAO.getOperationMappingsByStatus(Operation.Status
.PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance()
.getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize());
try {
if (DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled()) {
ServerCtxInfo serverCtxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
if (serverCtxInfo != null) {
operationMappingsTenantMap = operationDAO.getAllocatedOperationMappingsByStatus(Operation.Status
.PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance()
.getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize(),
serverCtxInfo.getActiveServerCount(), serverCtxInfo.getLocalServerHashIdx());
} else {
if (log.isDebugEnabled()) {
log.debug("Active server information not recorded yet.");
}
}
} else {
operationMappingsTenantMap = operationDAO.getOperationMappingsByStatus(Operation.Status
.PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance()
.getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize());
}
} catch (HeartBeatManagementException e) {
throw new RuntimeException(e);
}
} catch (OperationManagementDAOException e) {
log.error("Unable to retrieve scheduled pending operations for task.", e);
} finally {

@ -17,12 +17,6 @@
*/
package io.entgra.device.mgt.core.device.mgt.core.push.notification.mgt.task;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException;
import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.OperationManagementException;
import io.entgra.device.mgt.core.device.mgt.core.common.BaseDeviceManagementTest;
@ -37,6 +31,14 @@ import io.entgra.device.mgt.core.device.mgt.core.operation.mgt.dao.OperationMana
import io.entgra.device.mgt.core.device.mgt.core.operation.mgt.dao.OperationManagementDAOFactory;
import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderService;
import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderServiceImpl;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import java.sql.SQLException;
@ -52,13 +54,16 @@ public class PushNotificationSchedulerTaskTest extends BaseDeviceManagementTest
private PushNotificationSchedulerTask pushNotificationSchedulerTask;
private OperationDAO operationDAO;
private HeartBeatManagementService heartBeatManagementService;
@BeforeClass
public void init() throws DeviceManagementException, RegistryException {
DeviceConfigurationManager.getInstance().initConfig();
log.info("Initializing Push Notification Scheduler Test Class");
DeviceManagementServiceComponent.notifyStartupListeners();
this.deviceMgtProviderService = Mockito.mock(DeviceManagementProviderServiceImpl.class, Mockito.CALLS_REAL_METHODS);
this.heartBeatManagementService = Mockito.mock(HeartBeatManagementService.class, Mockito.CALLS_REAL_METHODS);
DeviceManagementDataHolder.getInstance().setDeviceManagementProvider(this.deviceMgtProviderService);
DeviceManagementDataHolder.getInstance().setHeartBeatService(this.heartBeatManagementService);
this.operationDAO = OperationManagementDAOFactory.getOperationDAO();
this.pushNotificationSchedulerTask = new PushNotificationSchedulerTask();
}
@ -69,6 +74,7 @@ public class PushNotificationSchedulerTaskTest extends BaseDeviceManagementTest
OperationManagementDAOException {
try {
log.info("Attempting to execute push notification task scheduler");
Mockito.when(this.heartBeatManagementService.isTaskPartitioningEnabled()).thenReturn(false);
Mockito.doReturn(new TestNotificationStrategy()).when(this.deviceMgtProviderService)
.getNotificationStrategyByDeviceType(Mockito.anyString());
Mockito.doReturn(new io.entgra.device.mgt.core.device.mgt.common.operation.mgt.Operation())
@ -81,6 +87,8 @@ public class PushNotificationSchedulerTaskTest extends BaseDeviceManagementTest
.getPushNotificationConfiguration().getSchedulerBatchSize());
Assert.assertEquals(operationMappingsTenantMap.size(), 0);
log.info("Push notification task execution complete.");
} catch (HeartBeatManagementException e) {
throw new RuntimeException(e);
} finally {
OperationManagementDAOFactory.closeConnection();
}

@ -70,6 +70,7 @@
!io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.internal,
io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.*
</Export-Package>
<DynamicImport-Package>*</DynamicImport-Package>
</instructions>
</configuration>
</plugin>

@ -28,6 +28,7 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBea
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.ndatasource.core.DataSourceService;
import java.util.List;
@ -73,7 +74,9 @@ public class HeartBeatBeaconComponent {
clusterFormationChangedNotifierRepository);
//Setting up executors to notify heart beat status */
HeartBeatExecutor.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails());
HeartBeatExecutor heartBeatExecutor = new HeartBeatExecutor();
componentContext.getBundleContext().registerService(
ServerStartupObserver.class.getName(), heartBeatExecutor, null);
}

@ -26,13 +26,16 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.dto.ServerContex
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.core.ServerStartupObserver;
import java.io.IOException;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class HeartBeatExecutor {
public class HeartBeatExecutor implements ServerStartupObserver {
private static Log log = LogFactory.getLog(HeartBeatExecutor.class);
private static final int DEFAULT__NOTIFIER_INTERVAL = 5;
@ -43,6 +46,20 @@ public class HeartBeatExecutor {
CONFIG = HeartBeatBeaconConfig.getInstance();
}
@Override
public void completingServerStartup() {
}
@Override
public void completedServerStartup() {
try {
setUpNotifiers(HeartBeatBeaconUtils.getServerDetails());
} catch (HeartBeatBeaconConfigurationException | UnknownHostException | SocketException e) {
throw new RuntimeException(e);
}
}
static void setUpNotifiers(ServerContext ctx) throws HeartBeatBeaconConfigurationException {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();

@ -235,6 +235,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
}
} else {
//first time execution, elect if not present
heartBeatDAO.purgeCandidates();
electCandidate(servers);
}
HeartBeatBeaconDAOFactory.commitTransaction();
@ -268,6 +269,10 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
String serverUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
ServerContext serverContext = servers.get(serverUUID);
if (log.isDebugEnabled()) {
log.debug("HashIndex (previous, current) : " + lastHashIndex + ", " + serverContext.getIndex());
log.debug("ActiveServerCount (previous, current) : " + lastActiveCount + ", " + servers.size());
}
// cluster change can be identified, either by changing hash index or changing active server count
if ((lastHashIndex != serverContext.getIndex()) || (lastActiveCount != servers.size())) {
lastHashIndex = serverContext.getIndex();
@ -280,6 +285,9 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
Runnable r = new Runnable() {
@Override
public void run() {
if (log.isDebugEnabled()) {
log.debug("notify cluster formation changed : " + notifier.getType());
}
notifier.notifyClusterFormationChanged(lastHashIndex, lastActiveCount);
}
};

@ -353,7 +353,7 @@
<!--</DeviceTypes>-->
<!--<Code>DOUBLE_COMMAND</Code>-->
<!--<InitialStatus>REQUIRED_CONFIRMATION</InitialStatus>-->
<!--<Timeout>30</Timeout>-->
<!--<Timeout>30000</Timeout>-->
<!--<NextStatus>ERROR</NextStatus>-->
<!--</OperationTimeout>-->
{% if device_mgt_conf.operation_timeout_conf is defined %}

Loading…
Cancel
Save