Fix issues with task id getting null

remotes/1717824210486943042/master
Charitha Goonetilleke 2 years ago
parent 0a9b20e0fc
commit ce20ead035

@ -62,8 +62,7 @@ public class TaskManagementServiceImpl implements TaskManagementService {
log.error(msg); log.error(msg);
throw new TaskManagementException(msg); throw new TaskManagementException(msg);
} }
if (!nTaskService.getRegisteredTaskTypes().contains( if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
try { try {
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
this.taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); this.taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);

@ -72,7 +72,7 @@ public class TaskManagementUtil {
+ TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx; + TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx;
} }
public static String generateTaskPropsMD5(Map<String, String> taskProperties) throws TaskManagementException { public static String generateTaskPropsMD5(Map<String, String> taskProperties) {
taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP); taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP);
taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX); taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX);
taskProperties.remove(TaskMgtConstants.Task.LOCAL_TASK_NAME); taskProperties.remove(TaskMgtConstants.Task.LOCAL_TASK_NAME);

@ -64,7 +64,7 @@ public class IoTSStartupHandler implements ServerStartupObserver {
log.error("Error occurred when comparing tasks.", e); log.error("Error occurred when comparing tasks.", e);
} }
} }
}, 200000, 600000); }, 200000, 300000);
} }
private void compareTasks() { private void compareTasks() {
@ -127,29 +127,18 @@ public class IoTSStartupHandler implements ServerStartupObserver {
// add or update task into nTask core // add or update task into nTask core
for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) { for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) {
String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId());
Map<String, String> taskProperties = dt.getProperties();
try {
int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx();
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId);
} catch (HeartBeatManagementException e) {
String msg = "Unexpected exception when getting server hash index.";
log.error(msg, e);
throw new TaskManagementException(msg, e);
}
boolean isExist = false; boolean isExist = false;
for (TaskInfo taskInfo : tasks) { for (TaskInfo taskInfo : tasks) {
if (taskInfo.getName().equals(generatedTaskId)) { if (taskInfo.getName().equals(generatedTaskId)) {
isExist = true; isExist = true;
TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo(); TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo();
String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskProperties); String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(dt.getProperties());
String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties()); String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties());
if (!triggerInfo.getCronExpression().equals(dt.getCronExpression()) if (!triggerInfo.getCronExpression().equals(dt.getCronExpression())
|| !dynamicTaskPropMD5.equals(existingTaskPropMD5)) { || !dynamicTaskPropMD5.equals(existingTaskPropMD5)) {
triggerInfo.setCronExpression(dt.getCronExpression()); triggerInfo.setCronExpression(dt.getCronExpression());
taskInfo.setTriggerInfo(triggerInfo); taskInfo.setTriggerInfo(triggerInfo);
taskInfo.setProperties(taskProperties); taskInfo.setProperties(populateTaskProperties(tenantId, generatedTaskId, dt.getProperties()));
taskManager.registerTask(taskInfo); taskManager.registerTask(taskInfo);
taskManager.rescheduleTask(generatedTaskId); taskManager.rescheduleTask(generatedTaskId);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@ -176,7 +165,7 @@ public class IoTSStartupHandler implements ServerStartupObserver {
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
triggerInfo.setCronExpression(dt.getCronExpression()); triggerInfo.setCronExpression(dt.getCronExpression());
TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(),
taskProperties, triggerInfo); populateTaskProperties(tenantId, generatedTaskId, dt.getProperties()), triggerInfo);
taskManager.registerTask(taskInfo); taskManager.registerTask(taskInfo);
taskManager.scheduleTask(generatedTaskId); taskManager.scheduleTask(generatedTaskId);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@ -188,6 +177,23 @@ public class IoTSStartupHandler implements ServerStartupObserver {
} }
} }
private static Map<String, String> populateTaskProperties(int tenantId, String generatedTaskId,
Map<String, String> taskProperties)
throws TaskManagementException {
try {
int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx();
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId);
taskProperties.put(TaskMgtConstants.Task.TENANT_ID_PROP, String.valueOf(tenantId));
return taskProperties;
} catch (HeartBeatManagementException e) {
String msg = "Unexpected exception when getting server hash index.";
log.error(msg, e);
throw new TaskManagementException(msg, e);
}
}
private static void deleteObsoleteTasks(TaskService nTaskService, List<DynamicTask> dynamicTasks) private static void deleteObsoleteTasks(TaskService nTaskService, List<DynamicTask> dynamicTasks)
throws TaskManagementException, TaskException { throws TaskManagementException, TaskException {

Loading…
Cancel
Save