diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java index fdfdaeaf4f2..747f7cfd324 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java @@ -59,8 +59,7 @@ public class TaskManagementUtil { try { int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() .getServerCtxInfo().getLocalServerHashIdx(); - return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId - + TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx; + return generateTaskId(dynamicTaskId, serverHashIdx); } catch (HeartBeatManagementException e) { String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId; log.error(msg, e); @@ -68,6 +67,11 @@ public class TaskManagementUtil { } } + public static String generateTaskId(int dynamicTaskId, int serverHashIdx) { + return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId + + TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx; + } + public static String generateTaskPropsMD5(Map taskProperties) throws TaskManagementException { taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP); taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX); diff --git a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java index 1c48d81f217..8ae40d6a264 100755 --- a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java +++ b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java @@ -41,6 +41,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -70,7 +71,6 @@ public class IoTSStartupHandler implements ServerStartupObserver { if (log.isDebugEnabled()) { log.debug("Comparing Tasks from carbon nTask manager and entgra task manager"); } - TaskManager taskManager; TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService(); if (nTaskService == null) { String msg = "Unable to load TaskService from the carbon nTask core"; @@ -80,147 +80,175 @@ public class IoTSStartupHandler implements ServerStartupObserver { try { List dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService() .getAllDynamicTasks(); - Map> tenantedDynamicTasks = new HashMap<>(); - List dts; - for (DynamicTask dt : dynamicTasks) { - if (tenantedDynamicTasks.containsKey(dt.getTenantId())) { - dts = tenantedDynamicTasks.get(dt.getTenantId()); - } else { - dts = new ArrayList<>(); - } - dts.add(dt); - tenantedDynamicTasks.put(dt.getTenantId(), dts); + + scheduleMissingTasks(nTaskService, dynamicTasks); + deleteObsoleteTasks(nTaskService, dynamicTasks); + + if (log.isDebugEnabled()) { + log.debug("Task Comparison Completed and all tasks in current node are updated"); } - for (Integer tenantId : tenantedDynamicTasks.keySet()) { - if (tenantId == -1) { - log.warn("Found " + tenantedDynamicTasks.get(tenantId).size() + - " invalid tasks without a valid tenant id."); - continue; - } - PrivilegedCarbonContext.startTenantFlow(); - PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true); - if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { - nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); + } catch (TaskException e) { + String msg = "Error occurred while accessing carbon nTask manager."; + log.error(msg, e); + } catch (TaskManagementException e) { + String msg = "Error occurred while retrieving all active tasks from entgra task manager"; + log.error(msg, e); + } + + } + + private static void scheduleMissingTasks(TaskService nTaskService, List dynamicTasks) + throws TaskException, TaskManagementException { + Map> tenantedDynamicTasks = new HashMap<>(); + List dts; + for (DynamicTask dt : dynamicTasks) { + if (tenantedDynamicTasks.containsKey(dt.getTenantId())) { + dts = tenantedDynamicTasks.get(dt.getTenantId()); + } else { + dts = new ArrayList<>(); + } + dts.add(dt); + tenantedDynamicTasks.put(dt.getTenantId(), dts); + } + TaskManager taskManager; + for (Integer tenantId : tenantedDynamicTasks.keySet()) { + if (tenantId == -1) { + log.warn("Found " + tenantedDynamicTasks.get(tenantId).size() + + " invalid tasks without a valid tenant id."); + continue; + } + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true); + if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { + nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); + } + taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); + List tasks = taskManager.getAllTasks(); + // add or update task into nTask core + for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) { + String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); + Map taskProperties = dt.getProperties(); + try { + int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService() + .getServerCtxInfo().getLocalServerHashIdx(); + taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); + taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId); + } catch (HeartBeatManagementException e) { + String msg = "Unexpected exception when getting server hash index."; + log.error(msg, e); + throw new TaskManagementException(msg, e); } - taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); - List tasks = taskManager.getAllTasks(); - // add or update task into nTask core - for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) { - String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); - Map taskProperties = dt.getProperties(); - try { - int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService() - .getServerCtxInfo().getLocalServerHashIdx(); - taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); - taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId); - } catch (HeartBeatManagementException e) { - String msg = "Unexpected exception when getting server hash index."; - log.error(msg, e); - throw new TaskManagementException(msg, e); - } - boolean isExist = false; - for (TaskInfo taskInfo : tasks) { - if (taskInfo.getName().equals(generatedTaskId)) { - isExist = true; - TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo(); - String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskProperties); - String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties()); - if (!triggerInfo.getCronExpression().equals(dt.getCronExpression()) - || !dynamicTaskPropMD5.equals(existingTaskPropMD5)) { - triggerInfo.setCronExpression(dt.getCronExpression()); - taskInfo.setTriggerInfo(triggerInfo); - taskInfo.setProperties(taskProperties); - taskManager.registerTask(taskInfo); - taskManager.rescheduleTask(generatedTaskId); - if (log.isDebugEnabled()) { - log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table"); - } + boolean isExist = false; + for (TaskInfo taskInfo : tasks) { + if (taskInfo.getName().equals(generatedTaskId)) { + isExist = true; + TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo(); + String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskProperties); + String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties()); + if (!triggerInfo.getCronExpression().equals(dt.getCronExpression()) + || !dynamicTaskPropMD5.equals(existingTaskPropMD5)) { + triggerInfo.setCronExpression(dt.getCronExpression()); + taskInfo.setTriggerInfo(triggerInfo); + taskInfo.setProperties(taskProperties); + taskManager.registerTask(taskInfo); + taskManager.rescheduleTask(generatedTaskId); + if (log.isDebugEnabled()) { + log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table"); } - if (dt.isEnabled() - && taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) { - taskManager.resumeTask(generatedTaskId); - if (log.isDebugEnabled()) { - log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table"); - } - } else if (!dt.isEnabled() - && taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) { - taskManager.pauseTask(generatedTaskId); - if (log.isDebugEnabled()) { - log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table"); - } + } + if (dt.isEnabled() + && taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) { + taskManager.resumeTask(generatedTaskId); + if (log.isDebugEnabled()) { + log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table"); + } + } else if (!dt.isEnabled() + && taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) { + taskManager.pauseTask(generatedTaskId); + if (log.isDebugEnabled()) { + log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table"); } - break; } + break; } - if (!isExist) { - TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); - triggerInfo.setCronExpression(dt.getCronExpression()); - TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), - taskProperties, triggerInfo); - taskManager.registerTask(taskInfo); - taskManager.scheduleTask(generatedTaskId); - if (log.isDebugEnabled()) { - log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table"); - } + } + if (!isExist) { + TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); + triggerInfo.setCronExpression(dt.getCronExpression()); + TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), + taskProperties, triggerInfo); + taskManager.registerTask(taskInfo); + taskManager.scheduleTask(generatedTaskId); + if (log.isDebugEnabled()) { + log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table"); } } - PrivilegedCarbonContext.endTenantFlow(); } + PrivilegedCarbonContext.endTenantFlow(); + } + } - List tenants = new ArrayList<>(); - try { - RealmService realmService = TaskWatcherDataHolder.getInstance().getRealmService(); - Tenant[] tenantArray = realmService.getTenantManager().getAllTenants(); - if (tenantArray != null && tenantArray.length != 0) { - tenants.addAll(Arrays.asList(tenantArray)); - } - Tenant superTenant = new Tenant(); - superTenant.setId(-1234); - tenants.add(superTenant); - } catch (UserStoreException e) { - String msg = "Unable to load tenants"; - log.error(msg, e); - return; + private static void deleteObsoleteTasks(TaskService nTaskService, List dynamicTasks) + throws TaskManagementException, TaskException { + + List tenants = new ArrayList<>(); + try { + RealmService realmService = TaskWatcherDataHolder.getInstance().getRealmService(); + Tenant[] tenantArray = realmService.getTenantManager().getAllTenants(); + if (tenantArray != null && tenantArray.length != 0) { + tenants.addAll(Arrays.asList(tenantArray)); } + Tenant superTenant = new Tenant(); + superTenant.setId(-1234); + tenants.add(superTenant); + } catch (UserStoreException e) { + String msg = "Unable to load tenants"; + log.error(msg, e); + return; + } - for (Tenant tenant : tenants) { - PrivilegedCarbonContext.startTenantFlow(); - PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true); - if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { - nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); - } - taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); - List tasks = taskManager.getAllTasks(); - // Remove deleted items from the nTask core - for (TaskInfo taskInfo : tasks) { - boolean isExist = false; - for (DynamicTask dt : dynamicTasks) { + TaskManager taskManager; + Set hashIds; + try { + hashIds = TaskWatcherDataHolder.getInstance().getHeartBeatService().getActiveServers().keySet(); + } catch (HeartBeatManagementException e) { + String msg = "Unexpected exception when getting hash indexes of active servers"; + log.error(msg, e); + throw new TaskManagementException(msg, e); + } + + for (Tenant tenant : tenants) { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true); + if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { + nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); + } + taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); + List tasks = taskManager.getAllTasks(); + // Remove deleted items from the nTask core + for (TaskInfo taskInfo : tasks) { + boolean isExist = false; + for (DynamicTask dt : dynamicTasks) { + for (int hid : hashIds) { if (tenant.getId() == dt.getTenantId() && - taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()))) { + taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId(), hid))) { isExist = true; break; } } - if (!isExist) { - taskManager.deleteTask(taskInfo.getName()); - if (log.isDebugEnabled()) { - log.debug("Task '" + taskInfo.getName() + "' deleted according to the dynamic task table"); - } + if (isExist) { + break; + } + } + if (!isExist) { + taskManager.deleteTask(taskInfo.getName()); + if (log.isDebugEnabled()) { + log.debug("Task '" + taskInfo.getName() + "' deleted according to the dynamic task table"); } } - PrivilegedCarbonContext.endTenantFlow(); - } - - if (log.isDebugEnabled()) { - log.debug("Task Comparison Completed and all tasks in current node are updated"); } - } catch (TaskException e) { - String msg = "Error occurred while accessing carbon nTask manager."; - log.error(msg, e); - } catch (TaskManagementException e) { - String msg = "Error occurred while retrieving all active tasks from entgra task manager"; - log.error(msg, e); + PrivilegedCarbonContext.endTenantFlow(); } - } + }