diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java index 6ce1ac5b6b1..ad3269be004 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java @@ -77,7 +77,8 @@ public abstract class DynamicPartitionedScheduleTask implements Task { // during the server startup if (localHashIndex == null ) { if (log.isDebugEnabled()) { - log.debug("Executing startup scheduled task (" + getTaskName() + ")"); + log.debug("Executing startup scheduled task (" + getTaskName() + ") with class: " + + this.getClass().getName()); } executeDynamicTask(); return; diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/internal/TaskManagerDataHolder.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/internal/TaskManagerDataHolder.java index 602daa68b94..8de0cbb40fe 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/internal/TaskManagerDataHolder.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/internal/TaskManagerDataHolder.java @@ -27,7 +27,7 @@ public class TaskManagerDataHolder { private HeartBeatManagementService heartBeatService; - private static TaskManagerDataHolder thisInstance = new TaskManagerDataHolder(); + private static final TaskManagerDataHolder thisInstance = new TaskManagerDataHolder(); private TaskManagerDataHolder() { } diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java index e2deb157777..c206d77dc35 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java @@ -293,7 +293,7 @@ public class TaskManagementServiceImpl implements TaskManagementService { @Override public List getAllDynamicTasks() throws TaskManagementException { - List dynamicTasks = null; + List dynamicTasks; try { if (log.isDebugEnabled()) { log.debug("Fetching the details of all dynamic tasks"); @@ -324,7 +324,7 @@ public class TaskManagementServiceImpl implements TaskManagementService { @Override public DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementException { - DynamicTask dynamicTask = null; + DynamicTask dynamicTask; try { if (log.isDebugEnabled()) { log.debug("Fetching the details of dynamic task '" + dynamicTaskId + "'"); @@ -352,7 +352,7 @@ public class TaskManagementServiceImpl implements TaskManagementService { @Override public List getActiveDynamicTasks() throws TaskManagementException { - List dynamicTasks = null; + List dynamicTasks; try { if (log.isDebugEnabled()) { log.debug("Fetching the details of all active dynamic tasks"); diff --git a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/pom.xml b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/pom.xml index 6b85fee2a7a..d546c286989 100755 --- a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/pom.xml +++ b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/pom.xml @@ -52,13 +52,18 @@ Task Watcher Bundle io.entgra.task.mgt.watcher.internal - org.osgi.framework.*;version="${imp.package.version.osgi.framework}", - org.osgi.service.*;version="${imp.package.version.osgi.service}", - org.apache.commons.logging, - org.wso2.carbon.ntask.*, + io.entgra.server.bootup.heartbeat.beacon.*, io.entgra.task.mgt.common.*, io.entgra.task.mgt.core.*, + org.apache.commons.logging, + org.osgi.framework.*;version="${imp.package.version.osgi.framework}", + org.osgi.service.*;version="${imp.package.version.osgi.service}", + org.wso2.carbon.context, org.wso2.carbon.core, + org.wso2.carbon.device.mgt.common.*, + org.wso2.carbon.ntask.*, + org.wso2.carbon.user.api, + org.wso2.carbon.user.core.*, io.entgra.task.mgt.watcher.* @@ -95,14 +100,40 @@ io.entgra.task.mgt.common provided + + org.wso2.carbon.devicemgt + org.wso2.carbon.device.mgt.common + provided + org.wso2.carbon.devicemgt org.wso2.carbon.device.mgt.core provided + + org.wso2.carbon.devicemgt + io.entgra.server.bootup.heartbeat.beacon + provided + org.wso2.carbon org.wso2.carbon.core + provided + + + org.wso2.carbon + org.wso2.carbon.user.core + provided + + + org.wso2.carbon + org.wso2.carbon.user.api + provided + + + org.wso2.carbon + org.wso2.carbon.utils + provided diff --git a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java index 824cb57295c..1c48d81f217 100755 --- a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java +++ b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java @@ -18,7 +18,7 @@ package io.entgra.task.mgt.watcher; - +import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.task.mgt.common.bean.DynamicTask; import io.entgra.task.mgt.common.constant.TaskMgtConstants; import io.entgra.task.mgt.common.exception.TaskManagementException; @@ -26,13 +26,21 @@ import io.entgra.task.mgt.core.util.TaskManagementUtil; import io.entgra.task.mgt.watcher.internal.TaskWatcherDataHolder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.core.ServerStartupObserver; import org.wso2.carbon.ntask.common.TaskException; import org.wso2.carbon.ntask.core.TaskInfo; import org.wso2.carbon.ntask.core.TaskManager; import org.wso2.carbon.ntask.core.service.TaskService; +import org.wso2.carbon.user.api.Tenant; +import org.wso2.carbon.user.api.UserStoreException; +import org.wso2.carbon.user.core.service.RealmService; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -49,93 +57,169 @@ public class IoTSStartupHandler implements ServerStartupObserver { timer.schedule(new TimerTask() { @Override public void run() { - compareTasks(); + try { + compareTasks(); + } catch (Exception e) { + log.error("Error occurred when comparing tasks.", e); + } } }, 200000, 600000); } private void compareTasks() { - log.info("Comparing Tasks from carbon n task manager and engtra task manager"); - TaskManager taskManager = null; + if (log.isDebugEnabled()) { + log.debug("Comparing Tasks from carbon nTask manager and entgra task manager"); + } + TaskManager taskManager; TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService(); if (nTaskService == null) { - String msg = "Unable to load TaskService from the carbon n task core"; + String msg = "Unable to load TaskService from the carbon nTask core"; log.error(msg); + return; } try { - if (!nTaskService.getRegisteredTaskTypes().contains( - TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { - nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); - } - taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); - List dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService() .getAllDynamicTasks(); - List tasks = taskManager.getAllTasks(); - // add or update task into n task core + Map> tenantedDynamicTasks = new HashMap<>(); + List dts; for (DynamicTask dt : dynamicTasks) { - String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); - boolean isExist = false; - for (TaskInfo taskInfo : tasks) { - if (taskInfo.getName().equals(generatedTaskId)) { - isExist = true; - TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo(); - String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(dt.getProperties()); - String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties()); - if (!triggerInfo.getCronExpression().equals(dt.getCronExpression()) - || !dynamicTaskPropMD5.equals(existingTaskPropMD5)) { - triggerInfo.setCronExpression(dt.getCronExpression()); - taskInfo.setTriggerInfo(triggerInfo); - taskInfo.setProperties(dt.getProperties()); - taskManager.registerTask(taskInfo); - taskManager.rescheduleTask(generatedTaskId); - log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table"); + if (tenantedDynamicTasks.containsKey(dt.getTenantId())) { + dts = tenantedDynamicTasks.get(dt.getTenantId()); + } else { + dts = new ArrayList<>(); + } + dts.add(dt); + tenantedDynamicTasks.put(dt.getTenantId(), dts); + } + for (Integer tenantId : tenantedDynamicTasks.keySet()) { + if (tenantId == -1) { + log.warn("Found " + tenantedDynamicTasks.get(tenantId).size() + + " invalid tasks without a valid tenant id."); + continue; + } + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true); + if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { + nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); + } + taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); + List tasks = taskManager.getAllTasks(); + // add or update task into nTask core + for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) { + String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); + Map taskProperties = dt.getProperties(); + try { + int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService() + .getServerCtxInfo().getLocalServerHashIdx(); + taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); + taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId); + } catch (HeartBeatManagementException e) { + String msg = "Unexpected exception when getting server hash index."; + log.error(msg, e); + throw new TaskManagementException(msg, e); + } + boolean isExist = false; + for (TaskInfo taskInfo : tasks) { + if (taskInfo.getName().equals(generatedTaskId)) { + isExist = true; + TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo(); + String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskProperties); + String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties()); + if (!triggerInfo.getCronExpression().equals(dt.getCronExpression()) + || !dynamicTaskPropMD5.equals(existingTaskPropMD5)) { + triggerInfo.setCronExpression(dt.getCronExpression()); + taskInfo.setTriggerInfo(triggerInfo); + taskInfo.setProperties(taskProperties); + taskManager.registerTask(taskInfo); + taskManager.rescheduleTask(generatedTaskId); + if (log.isDebugEnabled()) { + log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table"); + } + } + if (dt.isEnabled() + && taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) { + taskManager.resumeTask(generatedTaskId); + if (log.isDebugEnabled()) { + log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table"); + } + } else if (!dt.isEnabled() + && taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) { + taskManager.pauseTask(generatedTaskId); + if (log.isDebugEnabled()) { + log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table"); + } + } + break; } - if (dt.isEnabled() - && taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) { - taskManager.resumeTask(generatedTaskId); - log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table"); - } else if (!dt.isEnabled() - && taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) { - taskManager.pauseTask(generatedTaskId); - log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table"); + } + if (!isExist) { + TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); + triggerInfo.setCronExpression(dt.getCronExpression()); + TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), + taskProperties, triggerInfo); + taskManager.registerTask(taskInfo); + taskManager.scheduleTask(generatedTaskId); + if (log.isDebugEnabled()) { + log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table"); } - break; } } - if (!isExist) { - TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); - triggerInfo.setCronExpression(dt.getCronExpression()); - TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), - dt.getProperties(), triggerInfo); - taskManager.registerTask(taskInfo); - taskManager.scheduleTask(generatedTaskId); - log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table"); + PrivilegedCarbonContext.endTenantFlow(); + } + + List tenants = new ArrayList<>(); + try { + RealmService realmService = TaskWatcherDataHolder.getInstance().getRealmService(); + Tenant[] tenantArray = realmService.getTenantManager().getAllTenants(); + if (tenantArray != null && tenantArray.length != 0) { + tenants.addAll(Arrays.asList(tenantArray)); } + Tenant superTenant = new Tenant(); + superTenant.setId(-1234); + tenants.add(superTenant); + } catch (UserStoreException e) { + String msg = "Unable to load tenants"; + log.error(msg, e); + return; } - // Remove deleted items from the n task core - for (TaskInfo taskInfo : tasks) { - boolean isExist = false; - for (DynamicTask dt : dynamicTasks) { - if (taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()))) { - isExist = true; - } + for (Tenant tenant : tenants) { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true); + if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { + nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); } - if (!isExist) { - taskManager.deleteTask(taskInfo.getName()); - log.debug("Task '" + taskInfo.getName() + "' deleted according to the dynamic task table"); + taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); + List tasks = taskManager.getAllTasks(); + // Remove deleted items from the nTask core + for (TaskInfo taskInfo : tasks) { + boolean isExist = false; + for (DynamicTask dt : dynamicTasks) { + if (tenant.getId() == dt.getTenantId() && + taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()))) { + isExist = true; + break; + } + } + if (!isExist) { + taskManager.deleteTask(taskInfo.getName()); + if (log.isDebugEnabled()) { + log.debug("Task '" + taskInfo.getName() + "' deleted according to the dynamic task table"); + } + } } + PrivilegedCarbonContext.endTenantFlow(); } - log.info("Task Comparison Completed and all tasks in current node are updated"); - } catch ( - TaskException e) { - String msg = "Error occurred while accessing carbon n task manager."; - log.error(msg); - } catch ( - TaskManagementException e) { + + if (log.isDebugEnabled()) { + log.debug("Task Comparison Completed and all tasks in current node are updated"); + } + } catch (TaskException e) { + String msg = "Error occurred while accessing carbon nTask manager."; + log.error(msg, e); + } catch (TaskManagementException e) { String msg = "Error occurred while retrieving all active tasks from entgra task manager"; - log.error(msg); + log.error(msg, e); } } diff --git a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/internal/TaskWatcherDataHolder.java b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/internal/TaskWatcherDataHolder.java index a464887d0be..9e2953ae25d 100755 --- a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/internal/TaskWatcherDataHolder.java +++ b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/internal/TaskWatcherDataHolder.java @@ -17,16 +17,20 @@ */ package io.entgra.task.mgt.watcher.internal; - +import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; import io.entgra.task.mgt.common.spi.TaskManagementService; import org.wso2.carbon.ntask.core.service.TaskService; +import org.wso2.carbon.user.core.service.RealmService; public class TaskWatcherDataHolder { private TaskManagementService taskManagerService; private TaskService nTaskService; - private static TaskWatcherDataHolder thisInstance = new TaskWatcherDataHolder(); + private HeartBeatManagementService heartBeatService; + private RealmService realmService; + + private static final TaskWatcherDataHolder thisInstance = new TaskWatcherDataHolder(); private TaskWatcherDataHolder() {} @@ -50,4 +54,20 @@ public class TaskWatcherDataHolder { this.nTaskService = nTaskService; } + public HeartBeatManagementService getHeartBeatService() { + return heartBeatService; + } + + public void setHeartBeatService(HeartBeatManagementService heartBeatService) { + this.heartBeatService = heartBeatService; + } + + public RealmService getRealmService() { + return this.realmService; + } + + public void setRealmService(RealmService realmService) { + this.realmService = realmService; + } + } diff --git a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/internal/TaskWatcherServiceComponent.java b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/internal/TaskWatcherServiceComponent.java index 735dc653305..209113df84a 100755 --- a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/internal/TaskWatcherServiceComponent.java +++ b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/internal/TaskWatcherServiceComponent.java @@ -17,7 +17,7 @@ */ package io.entgra.task.mgt.watcher.internal; - +import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService; import io.entgra.task.mgt.common.spi.TaskManagementService; import io.entgra.task.mgt.core.config.TaskConfigurationManager; import io.entgra.task.mgt.core.config.TaskManagementConfig; @@ -28,7 +28,7 @@ import org.osgi.framework.BundleContext; import org.osgi.service.component.ComponentContext; import org.wso2.carbon.core.ServerStartupObserver; import org.wso2.carbon.ntask.core.service.TaskService; - +import org.wso2.carbon.user.core.service.RealmService; /** * @scr.component @@ -45,6 +45,18 @@ import org.wso2.carbon.ntask.core.service.TaskService; * policy="dynamic" * bind="setTaskMgtService" * unbind="unsetTaskMgtService" + * @scr.reference name="entgra.heart.beat.service" + * interface="io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService" + * cardinality="0..1" + * policy="dynamic" + * bind="setHeartBeatService" + * unbind="unsetHeartBeatService" + * @scr.reference name="user.realmservice.default" + * interface="org.wso2.carbon.user.core.service.RealmService" + * cardinality="1..1" + * policy="dynamic" + * bind="setRealmService" + * unbind="unsetRealmService" */ public class TaskWatcherServiceComponent { @@ -110,4 +122,46 @@ public class TaskWatcherServiceComponent { TaskWatcherDataHolder.getInstance().setTaskManagementService(null); } -} \ No newline at end of file + @SuppressWarnings("unused") + protected void setHeartBeatService(HeartBeatManagementService heartBeatService) { + if (log.isDebugEnabled()) { + log.debug("Setting heart beat service to Task Manager Service Component"); + } + TaskWatcherDataHolder.getInstance().setHeartBeatService(heartBeatService); + } + + @SuppressWarnings("unused") + protected void unsetHeartBeatService(HeartBeatManagementService heartBeatManagementService) { + if (log.isDebugEnabled()) { + log.debug("Removing heart beat service from Task Manager Service Component"); + } + TaskWatcherDataHolder.getInstance().setHeartBeatService(null); + } + + /** + * Sets Realm Service. + * + * @param realmService An instance of RealmService + */ + @SuppressWarnings("unused") + protected void setRealmService(RealmService realmService) { + if (log.isDebugEnabled()) { + log.debug("Setting Realm Service"); + } + TaskWatcherDataHolder.getInstance().setRealmService(realmService); + } + + /** + * Unsets Realm Service. + * + * @param realmService An instance of RealmService + */ + @SuppressWarnings("unused") + protected void unsetRealmService(RealmService realmService) { + if (log.isDebugEnabled()) { + log.debug("Unsetting Realm Service"); + } + TaskWatcherDataHolder.getInstance().setRealmService(null); + } + +}