diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java index c206d77dc35..fe3a7cb97e6 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java @@ -62,8 +62,7 @@ public class TaskManagementServiceImpl implements TaskManagementService { log.error(msg); throw new TaskManagementException(msg); } - if (!nTaskService.getRegisteredTaskTypes().contains( - TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { + if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { try { nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); this.taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java index 747f7cfd324..5308fc29c9c 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java @@ -72,7 +72,7 @@ public class TaskManagementUtil { + TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx; } - public static String generateTaskPropsMD5(Map taskProperties) throws TaskManagementException { + public static String generateTaskPropsMD5(Map taskProperties) { taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP); taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX); taskProperties.remove(TaskMgtConstants.Task.LOCAL_TASK_NAME); diff --git a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java index 8ae40d6a264..355fba38250 100755 --- a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java +++ b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java @@ -64,7 +64,7 @@ public class IoTSStartupHandler implements ServerStartupObserver { log.error("Error occurred when comparing tasks.", e); } } - }, 200000, 600000); + }, 200000, 300000); } private void compareTasks() { @@ -127,29 +127,18 @@ public class IoTSStartupHandler implements ServerStartupObserver { // add or update task into nTask core for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) { String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); - Map taskProperties = dt.getProperties(); - try { - int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService() - .getServerCtxInfo().getLocalServerHashIdx(); - taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); - taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId); - } catch (HeartBeatManagementException e) { - String msg = "Unexpected exception when getting server hash index."; - log.error(msg, e); - throw new TaskManagementException(msg, e); - } boolean isExist = false; for (TaskInfo taskInfo : tasks) { if (taskInfo.getName().equals(generatedTaskId)) { isExist = true; TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo(); - String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskProperties); + String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(dt.getProperties()); String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties()); if (!triggerInfo.getCronExpression().equals(dt.getCronExpression()) || !dynamicTaskPropMD5.equals(existingTaskPropMD5)) { triggerInfo.setCronExpression(dt.getCronExpression()); taskInfo.setTriggerInfo(triggerInfo); - taskInfo.setProperties(taskProperties); + taskInfo.setProperties(populateTaskProperties(tenantId, generatedTaskId, dt.getProperties())); taskManager.registerTask(taskInfo); taskManager.rescheduleTask(generatedTaskId); if (log.isDebugEnabled()) { @@ -176,7 +165,7 @@ public class IoTSStartupHandler implements ServerStartupObserver { TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); triggerInfo.setCronExpression(dt.getCronExpression()); TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), - taskProperties, triggerInfo); + populateTaskProperties(tenantId, generatedTaskId, dt.getProperties()), triggerInfo); taskManager.registerTask(taskInfo); taskManager.scheduleTask(generatedTaskId); if (log.isDebugEnabled()) { @@ -188,6 +177,23 @@ public class IoTSStartupHandler implements ServerStartupObserver { } } + private static Map populateTaskProperties(int tenantId, String generatedTaskId, + Map taskProperties) + throws TaskManagementException { + try { + int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService() + .getServerCtxInfo().getLocalServerHashIdx(); + taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); + taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId); + taskProperties.put(TaskMgtConstants.Task.TENANT_ID_PROP, String.valueOf(tenantId)); + return taskProperties; + } catch (HeartBeatManagementException e) { + String msg = "Unexpected exception when getting server hash index."; + log.error(msg, e); + throw new TaskManagementException(msg, e); + } + } + private static void deleteObsoleteTasks(TaskService nTaskService, List dynamicTasks) throws TaskManagementException, TaskException {