diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java index 2e859bcead5..c4c9df0e56b 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTask.java @@ -17,6 +17,9 @@ */ package io.entgra.device.mgt.core.device.mgt.core.push.notification.mgt.task; +import io.entgra.device.mgt.core.device.mgt.common.ServerCtxInfo; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.context.PrivilegedCarbonContext; @@ -63,9 +66,27 @@ public class PushNotificationSchedulerTask implements Runnable { try { //Get next available operation list per device batch OperationManagementDAOFactory.openConnection(); - operationMappingsTenantMap = operationDAO.getOperationMappingsByStatus(Operation.Status - .PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance() - .getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize()); + try { + if (DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled()) { + ServerCtxInfo serverCtxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); + if (serverCtxInfo != null) { + operationMappingsTenantMap = operationDAO.getAllocatedOperationMappingsByStatus(Operation.Status + .PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance() + .getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize(), + serverCtxInfo.getActiveServerCount(), serverCtxInfo.getLocalServerHashIdx()); + } else { + if (log.isDebugEnabled()) { + log.debug("Active server information not recorded yet."); + } + } + } else { + operationMappingsTenantMap = operationDAO.getOperationMappingsByStatus(Operation.Status + .PENDING, Operation.PushNotificationStatus.SCHEDULED, DeviceConfigurationManager.getInstance() + .getDeviceManagementConfig().getPushNotificationConfiguration().getSchedulerBatchSize()); + } + } catch (HeartBeatManagementException e) { + throw new RuntimeException(e); + } } catch (OperationManagementDAOException e) { log.error("Unable to retrieve scheduled pending operations for task.", e); } finally { diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTaskTest.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTaskTest.java index 7d92b9faa1f..df04c2fb881 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTaskTest.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/test/java/io/entgra/device/mgt/core/device/mgt/core/push/notification/mgt/task/PushNotificationSchedulerTaskTest.java @@ -17,12 +17,6 @@ */ package io.entgra.device.mgt.core.device.mgt.core.push.notification.mgt.task; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException; import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.OperationManagementException; import io.entgra.device.mgt.core.device.mgt.core.common.BaseDeviceManagementTest; @@ -37,6 +31,14 @@ import io.entgra.device.mgt.core.device.mgt.core.operation.mgt.dao.OperationMana import io.entgra.device.mgt.core.device.mgt.core.operation.mgt.dao.OperationManagementDAOFactory; import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderService; import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderServiceImpl; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; +import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.sql.SQLException; @@ -52,13 +54,16 @@ public class PushNotificationSchedulerTaskTest extends BaseDeviceManagementTest private PushNotificationSchedulerTask pushNotificationSchedulerTask; private OperationDAO operationDAO; + private HeartBeatManagementService heartBeatManagementService; @BeforeClass public void init() throws DeviceManagementException, RegistryException { DeviceConfigurationManager.getInstance().initConfig(); log.info("Initializing Push Notification Scheduler Test Class"); DeviceManagementServiceComponent.notifyStartupListeners(); this.deviceMgtProviderService = Mockito.mock(DeviceManagementProviderServiceImpl.class, Mockito.CALLS_REAL_METHODS); + this.heartBeatManagementService = Mockito.mock(HeartBeatManagementService.class, Mockito.CALLS_REAL_METHODS); DeviceManagementDataHolder.getInstance().setDeviceManagementProvider(this.deviceMgtProviderService); + DeviceManagementDataHolder.getInstance().setHeartBeatService(this.heartBeatManagementService); this.operationDAO = OperationManagementDAOFactory.getOperationDAO(); this.pushNotificationSchedulerTask = new PushNotificationSchedulerTask(); } @@ -69,6 +74,7 @@ public class PushNotificationSchedulerTaskTest extends BaseDeviceManagementTest OperationManagementDAOException { try { log.info("Attempting to execute push notification task scheduler"); + Mockito.when(this.heartBeatManagementService.isTaskPartitioningEnabled()).thenReturn(false); Mockito.doReturn(new TestNotificationStrategy()).when(this.deviceMgtProviderService) .getNotificationStrategyByDeviceType(Mockito.anyString()); Mockito.doReturn(new io.entgra.device.mgt.core.device.mgt.common.operation.mgt.Operation()) @@ -81,6 +87,8 @@ public class PushNotificationSchedulerTaskTest extends BaseDeviceManagementTest .getPushNotificationConfiguration().getSchedulerBatchSize()); Assert.assertEquals(operationMappingsTenantMap.size(), 0); log.info("Push notification task execution complete."); + } catch (HeartBeatManagementException e) { + throw new RuntimeException(e); } finally { OperationManagementDAOFactory.closeConnection(); }