Fix issues with unnecessary task deletion

pull/1/head
Charitha Goonetilleke 2 years ago
parent 28488d3e08
commit 0a9b20e0fc

@ -59,8 +59,7 @@ public class TaskManagementUtil {
try { try {
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx(); .getServerCtxInfo().getLocalServerHashIdx();
return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId return generateTaskId(dynamicTaskId, serverHashIdx);
+ TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx;
} catch (HeartBeatManagementException e) { } catch (HeartBeatManagementException e) {
String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId; String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId;
log.error(msg, e); log.error(msg, e);
@ -68,6 +67,11 @@ public class TaskManagementUtil {
} }
} }
public static String generateTaskId(int dynamicTaskId, int serverHashIdx) {
return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId
+ TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx;
}
public static String generateTaskPropsMD5(Map<String, String> taskProperties) throws TaskManagementException { public static String generateTaskPropsMD5(Map<String, String> taskProperties) throws TaskManagementException {
taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP); taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP);
taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX); taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX);

@ -41,6 +41,7 @@ import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
@ -70,7 +71,6 @@ public class IoTSStartupHandler implements ServerStartupObserver {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Comparing Tasks from carbon nTask manager and entgra task manager"); log.debug("Comparing Tasks from carbon nTask manager and entgra task manager");
} }
TaskManager taskManager;
TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService(); TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService();
if (nTaskService == null) { if (nTaskService == null) {
String msg = "Unable to load TaskService from the carbon nTask core"; String msg = "Unable to load TaskService from the carbon nTask core";
@ -80,147 +80,175 @@ public class IoTSStartupHandler implements ServerStartupObserver {
try { try {
List<DynamicTask> dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService() List<DynamicTask> dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService()
.getAllDynamicTasks(); .getAllDynamicTasks();
Map<Integer, List<DynamicTask>> tenantedDynamicTasks = new HashMap<>();
List<DynamicTask> dts; scheduleMissingTasks(nTaskService, dynamicTasks);
for (DynamicTask dt : dynamicTasks) { deleteObsoleteTasks(nTaskService, dynamicTasks);
if (tenantedDynamicTasks.containsKey(dt.getTenantId())) {
dts = tenantedDynamicTasks.get(dt.getTenantId()); if (log.isDebugEnabled()) {
} else { log.debug("Task Comparison Completed and all tasks in current node are updated");
dts = new ArrayList<>();
}
dts.add(dt);
tenantedDynamicTasks.put(dt.getTenantId(), dts);
} }
for (Integer tenantId : tenantedDynamicTasks.keySet()) { } catch (TaskException e) {
if (tenantId == -1) { String msg = "Error occurred while accessing carbon nTask manager.";
log.warn("Found " + tenantedDynamicTasks.get(tenantId).size() + log.error(msg, e);
" invalid tasks without a valid tenant id."); } catch (TaskManagementException e) {
continue; String msg = "Error occurred while retrieving all active tasks from entgra task manager";
} log.error(msg, e);
PrivilegedCarbonContext.startTenantFlow(); }
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true);
if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { }
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
private static void scheduleMissingTasks(TaskService nTaskService, List<DynamicTask> dynamicTasks)
throws TaskException, TaskManagementException {
Map<Integer, List<DynamicTask>> tenantedDynamicTasks = new HashMap<>();
List<DynamicTask> dts;
for (DynamicTask dt : dynamicTasks) {
if (tenantedDynamicTasks.containsKey(dt.getTenantId())) {
dts = tenantedDynamicTasks.get(dt.getTenantId());
} else {
dts = new ArrayList<>();
}
dts.add(dt);
tenantedDynamicTasks.put(dt.getTenantId(), dts);
}
TaskManager taskManager;
for (Integer tenantId : tenantedDynamicTasks.keySet()) {
if (tenantId == -1) {
log.warn("Found " + tenantedDynamicTasks.get(tenantId).size() +
" invalid tasks without a valid tenant id.");
continue;
}
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true);
if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
}
taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
List<TaskInfo> tasks = taskManager.getAllTasks();
// add or update task into nTask core
for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) {
String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId());
Map<String, String> taskProperties = dt.getProperties();
try {
int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx();
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId);
} catch (HeartBeatManagementException e) {
String msg = "Unexpected exception when getting server hash index.";
log.error(msg, e);
throw new TaskManagementException(msg, e);
} }
taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); boolean isExist = false;
List<TaskInfo> tasks = taskManager.getAllTasks(); for (TaskInfo taskInfo : tasks) {
// add or update task into nTask core if (taskInfo.getName().equals(generatedTaskId)) {
for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) { isExist = true;
String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo();
Map<String, String> taskProperties = dt.getProperties(); String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskProperties);
try { String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties());
int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService() if (!triggerInfo.getCronExpression().equals(dt.getCronExpression())
.getServerCtxInfo().getLocalServerHashIdx(); || !dynamicTaskPropMD5.equals(existingTaskPropMD5)) {
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); triggerInfo.setCronExpression(dt.getCronExpression());
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId); taskInfo.setTriggerInfo(triggerInfo);
} catch (HeartBeatManagementException e) { taskInfo.setProperties(taskProperties);
String msg = "Unexpected exception when getting server hash index."; taskManager.registerTask(taskInfo);
log.error(msg, e); taskManager.rescheduleTask(generatedTaskId);
throw new TaskManagementException(msg, e); if (log.isDebugEnabled()) {
} log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table");
boolean isExist = false;
for (TaskInfo taskInfo : tasks) {
if (taskInfo.getName().equals(generatedTaskId)) {
isExist = true;
TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo();
String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskProperties);
String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties());
if (!triggerInfo.getCronExpression().equals(dt.getCronExpression())
|| !dynamicTaskPropMD5.equals(existingTaskPropMD5)) {
triggerInfo.setCronExpression(dt.getCronExpression());
taskInfo.setTriggerInfo(triggerInfo);
taskInfo.setProperties(taskProperties);
taskManager.registerTask(taskInfo);
taskManager.rescheduleTask(generatedTaskId);
if (log.isDebugEnabled()) {
log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table");
}
} }
if (dt.isEnabled() }
&& taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) { if (dt.isEnabled()
taskManager.resumeTask(generatedTaskId); && taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) {
if (log.isDebugEnabled()) { taskManager.resumeTask(generatedTaskId);
log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table"); if (log.isDebugEnabled()) {
} log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table");
} else if (!dt.isEnabled() }
&& taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) { } else if (!dt.isEnabled()
taskManager.pauseTask(generatedTaskId); && taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) {
if (log.isDebugEnabled()) { taskManager.pauseTask(generatedTaskId);
log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table"); if (log.isDebugEnabled()) {
} log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table");
} }
break;
} }
break;
} }
if (!isExist) { }
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); if (!isExist) {
triggerInfo.setCronExpression(dt.getCronExpression()); TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), triggerInfo.setCronExpression(dt.getCronExpression());
taskProperties, triggerInfo); TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(),
taskManager.registerTask(taskInfo); taskProperties, triggerInfo);
taskManager.scheduleTask(generatedTaskId); taskManager.registerTask(taskInfo);
if (log.isDebugEnabled()) { taskManager.scheduleTask(generatedTaskId);
log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table"); if (log.isDebugEnabled()) {
} log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table");
} }
} }
PrivilegedCarbonContext.endTenantFlow();
} }
PrivilegedCarbonContext.endTenantFlow();
}
}
List<Tenant> tenants = new ArrayList<>(); private static void deleteObsoleteTasks(TaskService nTaskService, List<DynamicTask> dynamicTasks)
try { throws TaskManagementException, TaskException {
RealmService realmService = TaskWatcherDataHolder.getInstance().getRealmService();
Tenant[] tenantArray = realmService.getTenantManager().getAllTenants(); List<Tenant> tenants = new ArrayList<>();
if (tenantArray != null && tenantArray.length != 0) { try {
tenants.addAll(Arrays.asList(tenantArray)); RealmService realmService = TaskWatcherDataHolder.getInstance().getRealmService();
} Tenant[] tenantArray = realmService.getTenantManager().getAllTenants();
Tenant superTenant = new Tenant(); if (tenantArray != null && tenantArray.length != 0) {
superTenant.setId(-1234); tenants.addAll(Arrays.asList(tenantArray));
tenants.add(superTenant);
} catch (UserStoreException e) {
String msg = "Unable to load tenants";
log.error(msg, e);
return;
} }
Tenant superTenant = new Tenant();
superTenant.setId(-1234);
tenants.add(superTenant);
} catch (UserStoreException e) {
String msg = "Unable to load tenants";
log.error(msg, e);
return;
}
for (Tenant tenant : tenants) { TaskManager taskManager;
PrivilegedCarbonContext.startTenantFlow(); Set<Integer> hashIds;
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true); try {
if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { hashIds = TaskWatcherDataHolder.getInstance().getHeartBeatService().getActiveServers().keySet();
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); } catch (HeartBeatManagementException e) {
} String msg = "Unexpected exception when getting hash indexes of active servers";
taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); log.error(msg, e);
List<TaskInfo> tasks = taskManager.getAllTasks(); throw new TaskManagementException(msg, e);
// Remove deleted items from the nTask core }
for (TaskInfo taskInfo : tasks) {
boolean isExist = false; for (Tenant tenant : tenants) {
for (DynamicTask dt : dynamicTasks) { PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true);
if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
}
taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
List<TaskInfo> tasks = taskManager.getAllTasks();
// Remove deleted items from the nTask core
for (TaskInfo taskInfo : tasks) {
boolean isExist = false;
for (DynamicTask dt : dynamicTasks) {
for (int hid : hashIds) {
if (tenant.getId() == dt.getTenantId() && if (tenant.getId() == dt.getTenantId() &&
taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()))) { taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId(), hid))) {
isExist = true; isExist = true;
break; break;
} }
} }
if (!isExist) { if (isExist) {
taskManager.deleteTask(taskInfo.getName()); break;
if (log.isDebugEnabled()) { }
log.debug("Task '" + taskInfo.getName() + "' deleted according to the dynamic task table"); }
} if (!isExist) {
taskManager.deleteTask(taskInfo.getName());
if (log.isDebugEnabled()) {
log.debug("Task '" + taskInfo.getName() + "' deleted according to the dynamic task table");
} }
} }
PrivilegedCarbonContext.endTenantFlow();
}
if (log.isDebugEnabled()) {
log.debug("Task Comparison Completed and all tasks in current node are updated");
} }
} catch (TaskException e) { PrivilegedCarbonContext.endTenantFlow();
String msg = "Error occurred while accessing carbon nTask manager.";
log.error(msg, e);
} catch (TaskManagementException e) {
String msg = "Error occurred while retrieving all active tasks from entgra task manager";
log.error(msg, e);
} }
} }
} }

Loading…
Cancel
Save